mirror of https://github.com/citusdata/citus.git
PR #6728 / commit - 1
Add a method to send multiple commands to worker list reusing the same bare connections. Change will be useful for metadata sync api.pull/6728/head
parent
e71bfd6074
commit
98abd68178
|
@ -374,6 +374,54 @@ SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendCommandListToWorkerListWithBareConnections sends the command list
|
||||||
|
* over the specified bare connections. This function is mainly useful to
|
||||||
|
* avoid opening an closing connections excessively by allowing reusing
|
||||||
|
* connections to send multiple separate bare commands. The function
|
||||||
|
* raises an error if any of the queries fail.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SendCommandListToWorkerListWithBareConnections(List *workerConnectionList,
|
||||||
|
List *commandList)
|
||||||
|
{
|
||||||
|
Assert(!InCoordinatedTransaction());
|
||||||
|
Assert(!GetCoordinatedTransactionShouldUse2PC());
|
||||||
|
|
||||||
|
if (list_length(commandList) == 0 || list_length(workerConnectionList) == 0)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In order to avoid round-trips per query in queryStringList,
|
||||||
|
* we join the string and send as a single command. Also,
|
||||||
|
* if there is only a single command, avoid additional call to
|
||||||
|
* StringJoin given that some strings can be quite large.
|
||||||
|
*/
|
||||||
|
char *stringToSend = (list_length(commandList) == 1) ?
|
||||||
|
linitial(commandList) : StringJoin(commandList, ';');
|
||||||
|
|
||||||
|
/* send commands in parallel */
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
foreach_ptr(connection, workerConnectionList)
|
||||||
|
{
|
||||||
|
int querySent = SendRemoteCommand(connection, stringToSend);
|
||||||
|
if (querySent == 0)
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool failOnError = true;
|
||||||
|
foreach_ptr(connection, workerConnectionList)
|
||||||
|
{
|
||||||
|
ClearResults(connection, failOnError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
|
* SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
|
||||||
* with the given nodeName and nodePort. The commands are sent as part of the
|
* with the given nodeName and nodePort. The commands are sent as part of the
|
||||||
|
|
|
@ -82,6 +82,8 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
||||||
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
|
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||||
MultiConnection *workerConnection,
|
MultiConnection *workerConnection,
|
||||||
List *commandList);
|
List *commandList);
|
||||||
|
extern void SendCommandListToWorkerListWithBareConnections(List *workerConnections,
|
||||||
|
List *commandList);
|
||||||
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
||||||
List *workerNodeList,
|
List *workerNodeList,
|
||||||
const char *
|
const char *
|
||||||
|
|
Loading…
Reference in New Issue