niupre-reuse-connections-for-logical-ref-fkeys
Nitish Upreti 2022-09-14 14:59:28 -07:00
parent 17ae71fa8f
commit 4d4d941418
6 changed files with 25 additions and 15 deletions

View File

@ -248,7 +248,7 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* GetLocalConnectionForSubtransaction establishes a localhost connection. * GetLocalConnectionForSubtransaction establishes a localhost connection.
* To avoid creating excessive connections, we try to reuse an existing connection. * To avoid creating excessive connections, we try to reuse an existing connection.
*/ */
MultiConnection* MultiConnection *
GetLocalConnectionForSubtransactionAsUser(char *userName) GetLocalConnectionForSubtransactionAsUser(char *userName)
{ {
int connectionFlag = OUTSIDE_TRANSACTION; int connectionFlag = OUTSIDE_TRANSACTION;
@ -258,6 +258,7 @@ GetLocalConnectionForSubtransactionAsUser(char *userName)
userName, userName,
get_database_name( get_database_name(
MyDatabaseId)); MyDatabaseId));
/* Don't cache connection for the lifetime of the entire session. */ /* Don't cache connection for the lifetime of the entire session. */
connection->forceCloseAtTransactionEnd = true; connection->forceCloseAtTransactionEnd = true;

View File

@ -670,9 +670,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
nodeGroupId, nodeGroupId,
policy); policy);
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection, SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data)); list_make1(command->data));
} }
@ -690,9 +691,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
PG_DIST_CLEANUP, PG_DIST_CLEANUP,
recordId); recordId);
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection, SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data)); list_make1(command->data));
} }
@ -754,9 +756,11 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName
dropQuery->data); dropQuery->data);
/* remove the shard from the node */ /* remove the shard from the node */
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(connection, CitusExtensionOwnerName());
dropCommandList); bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
connection,
dropCommandList);
return success; return success;
} }

View File

@ -2245,7 +2245,8 @@ GetNextShardIdForSplitChild()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
"pg_catalog.pg_dist_shardid_seq")); "pg_catalog.pg_dist_shardid_seq"));
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
CitusExtensionOwnerName());
PGresult *result = NULL; PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result); &result);

View File

@ -357,7 +357,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
*/ */
void void
SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection,
List *commandList) List *commandList)
{ {
MarkRemoteTransactionCritical(workerConnection); MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection); RemoteTransactionBegin(workerConnection);
@ -456,7 +456,8 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
* *
*/ */
bool bool
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, List *commandList) SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection, List *commandList)
{ {
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{ {
@ -490,6 +491,7 @@ SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection
return !failed; return !failed;
} }
/* /*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command * SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the * list to the given worker in a single transaction that is outside of the
@ -505,8 +507,9 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort, nodeName, nodePort,
nodeUser, NULL); nodeUser, NULL);
bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(workerConnection, bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
commandList); workerConnection,
commandList);
CloseConnection(workerConnection); CloseConnection(workerConnection);
return !failed; return !failed;

View File

@ -289,7 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user, int32 port, const char *user,
const char *database); const char *database);
extern MultiConnection* GetLocalConnectionForSubtransactionAsUser(char* userName); extern MultiConnection * GetLocalConnectionForSubtransactionAsUser(char *userName);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname, const char *hostname,
int32 port, int32 port,

View File

@ -62,7 +62,8 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
List *commandList); List *commandList);
extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection( extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection, MultiConnection *workerConnection,
List *commandList); List *
commandList);
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
char *nodeName, char *nodeName,
int32 nodePort, int32 nodePort,