From 98abd68178d8f2b6d9a9a89fdc1e0781fe3421c3 Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Fri, 17 Mar 2023 21:39:51 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a method to send multiple commands to worker list reusing the same bare connections. Change will be useful for metadata sync api. --- .../transaction/worker_transaction.c | 48 +++++++++++++++++++ src/include/distributed/worker_transaction.h | 2 + 2 files changed, 50 insertions(+) diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 486dd7280..ffaab9e5a 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -374,6 +374,54 @@ SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerC } +/* + * SendCommandListToWorkerListWithBareConnections sends the command list + * over the specified bare connections. This function is mainly useful to + * avoid opening an closing connections excessively by allowing reusing + * connections to send multiple separate bare commands. The function + * raises an error if any of the queries fail. + */ +void +SendCommandListToWorkerListWithBareConnections(List *workerConnectionList, + List *commandList) +{ + Assert(!InCoordinatedTransaction()); + Assert(!GetCoordinatedTransactionShouldUse2PC()); + + if (list_length(commandList) == 0 || list_length(workerConnectionList) == 0) + { + /* nothing to do */ + return; + } + + /* + * In order to avoid round-trips per query in queryStringList, + * we join the string and send as a single command. Also, + * if there is only a single command, avoid additional call to + * StringJoin given that some strings can be quite large. + */ + char *stringToSend = (list_length(commandList) == 1) ? + linitial(commandList) : StringJoin(commandList, ';'); + + /* send commands in parallel */ + MultiConnection *connection = NULL; + foreach_ptr(connection, workerConnectionList) + { + int querySent = SendRemoteCommand(connection, stringToSend); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + bool failOnError = true; + foreach_ptr(connection, workerConnectionList) + { + ClearResults(connection, failOnError); + } +} + + /* * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node * with the given nodeName and nodePort. The commands are sent as part of the diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index aa137b76b..be8fe5ed6 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -82,6 +82,8 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, extern void SendCommandListToWorkerOutsideTransactionWithConnection( MultiConnection *workerConnection, List *commandList); +extern void SendCommandListToWorkerListWithBareConnections(List *workerConnections, + List *commandList); extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( List *workerNodeList, const char *