Make use of psql pipelining.

metadata-command-batch
aykutbozkurt 2023-04-03 14:59:09 +03:00
parent cee759d66d
commit 2a030dd286
3 changed files with 197 additions and 48 deletions

View File

@ -573,6 +573,178 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
} }
/*
* EnterRemotePipelineMode puts connection into pipeline mode.
*/
int
EnterRemotePipelineMode(MultiConnection *connection)
{
PGconn *pgConn = connection->pgConn;
/*
* Don't try to send command if connection is entirely gone
* (PQisnonblocking() would crash).
*/
if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
{
return 0;
}
Assert(PQisnonblocking(pgConn));
Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_OFF);
int rc = PQenterPipelineMode(pgConn);
return rc;
}
/*
* SyncRemotePipelineMode syncs the pipeline in the connection.
*/
int
SyncRemotePipelineMode(MultiConnection *connection)
{
PGconn *pgConn = connection->pgConn;
/*
* Don't try to send command if connection is entirely gone
* (PQisnonblocking() would crash).
*/
if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
{
return 0;
}
Assert(PQisnonblocking(pgConn));
Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_ON);
int rc = PQpipelineSync(pgConn);
return rc;
}
/*
* ExitRemotePipelineMode puts connection out of pipeline mode.
*/
int
ExitRemotePipelineMode(MultiConnection *connection)
{
PGconn *pgConn = connection->pgConn;
/*
* Don't try to send command if connection is entirely gone
* (PQisnonblocking() would crash).
*/
if (!pgConn || PQstatus(pgConn) != CONNECTION_OK)
{
return 0;
}
Assert(PQisnonblocking(pgConn));
Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_ON);
int rc = PQexitPipelineMode(pgConn);
return rc;
}
#if PG_VERSION_NUM >= PG_VERSION_14
/*
* ExecuteRemoteCommandsInConnectionsInPipelineMode executes commands in pipeline mode
* for given connections.
*/
void
ExecuteRemoteCommandsInConnectionsInPipelineMode(List *connections, List *commands)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connections)
{
ExecuteRemoteCommandsInPipelineMode(connection, commands);
}
}
/*
* ExecuteRemoteCommandsInPipelineMode executes commands in pipeline mode for given
* connection.
*/
void
ExecuteRemoteCommandsInPipelineMode(MultiConnection *connection, List *commands)
{
/* PQtrace(connection->pgConn, stdout); */
/* PQsetTraceFlags(connection->pgConn, */
/* PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE); */
/* start pipeline mode */
int rc = EnterRemotePipelineMode(connection);
if (rc != 1)
{
ReportConnectionError(connection, ERROR);
}
Assert(PQpipelineStatus(connection->pgConn) == PQ_PIPELINE_ON);
/* send commands */
char *command = NULL;
foreach_ptr(command, commands)
{
SendRemoteCommandParams(connection, command, 0, NULL, NULL, 0);
}
/* create sync point in pipeline which would flush send buffer */
rc = SyncRemotePipelineMode(connection);
if (rc != 1)
{
ReportConnectionError(connection, ERROR);
}
/* start processing the results */
bool raiseInterrupts = true;
PGresult *result = NULL;
int i = 0;
for (i = 0; i < list_length(commands); i++)
{
/* process i th command */
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
/* marks end of i th command */
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (result != NULL)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
}
/* sync response */
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_PIPELINE_SYNC)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
/* exit pipeline mode */
rc = ExitRemotePipelineMode(connection);
if (rc != 1)
{
ReportConnectionError(connection, ERROR);
}
Assert(PQpipelineStatus(connection->pgConn) == PQ_PIPELINE_OFF);
}
#endif
/* /*
* ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and * ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and
* checks if the result is equal to the expected result. If the result is equal to the * checks if the result is equal to the expected result. If the result is equal to the

View File

@ -150,7 +150,6 @@ static char * ColocationGroupDeleteCommand(uint32 colocationId);
static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId); static char * RemoteCollationIdExpression(Oid colocationId);
static bool ExceedsSyncBatchCount(MetadataSyncContext *context); static bool ExceedsSyncBatchCount(MetadataSyncContext *context);
static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
@ -4071,35 +4070,6 @@ ExceedsSyncBatchCount(MetadataSyncContext *context)
} }
/*
* ExecuteCommandListOnWorkerList executes a command list on all nodes
* in the given list.
*/
static void
ExecuteCommandListOnWorkerList(List *commandList, List *workerList)
{
List *taskList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(placement, workerNode);
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commandList);
task->taskPlacementList = list_make1(placement);
taskList = lappend(taskList, task);
}
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
/* /*
* ProcessBatchCommandsToActivatedNodes processes collected commands inside * ProcessBatchCommandsToActivatedNodes processes collected commands inside
* metadataSyncContext. Sends commands to activated workers if batch limit * metadataSyncContext. Sends commands to activated workers if batch limit
@ -4124,7 +4094,7 @@ ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool forceSen
/* /*
* ProcessBatchCommandsToSingleNode does the same operation as * ProcessBatchCommandsToMetadataNodes does the same operation as
* ProcessBatchCommandsToActivatedNodes but for metadata nodes. * ProcessBatchCommandsToActivatedNodes but for metadata nodes.
*/ */
void void
@ -4223,16 +4193,18 @@ SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context)
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{ {
List *workerNodes = context->activatedWorkerNodeList; List *workerNodes = context->activatedWorkerNodeList;
ExecuteCommandListOnWorkerList(commands, workerNodes); SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes,
CurrentUserName(),
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, */ commands);
/* CurrentUserName(), */
/* commands); */
} }
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{ {
List *workerConnections = context->activatedWorkerBareConnections; List *connections = context->activatedWorkerBareConnections;
SendCommandListToWorkerListWithBareConnections(workerConnections, commands); #if PG_VERSION_NUM >= PG_VERSION_14
ExecuteRemoteCommandsInConnectionsInPipelineMode(connections, commands);
#else
SendCommandListToWorkerListWithBareConnections(connections, commands);
#endif
} }
else else
{ {
@ -4266,11 +4238,9 @@ SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context)
{ {
List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock); RowShareLock);
ExecuteCommandListOnWorkerList(commands, metadataNodes); SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes,
CurrentUserName(),
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, */ commands);
/* CurrentUserName(), */
/* commands); */
} }
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{ {
@ -4310,11 +4280,9 @@ SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx)
Assert(nodeIdx < list_length(workerNodes)); Assert(nodeIdx < list_length(workerNodes));
WorkerNode *node = list_nth(workerNodes, nodeIdx); WorkerNode *node = list_nth(workerNodes, nodeIdx);
ExecuteCommandListOnWorkerList(commands, list_make1(node)); SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node),
CurrentUserName(),
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), */ commands);
/* CurrentUserName(), */
/* commands); */
} }
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{ {

View File

@ -53,6 +53,15 @@ extern bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection,
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
const char *command, const char *command,
PGresult **result); PGresult **result);
#if PG_VERSION_NUM >= PG_VERSION_14
extern void ExecuteRemoteCommandsInConnectionsInPipelineMode(List *connections,
List *commands);
extern void ExecuteRemoteCommandsInPipelineMode(MultiConnection *connection,
List *commands);
extern int EnterRemotePipelineMode(MultiConnection *connection);
extern int SyncRemotePipelineMode(MultiConnection *connection);
extern int ExitRemotePipelineMode(MultiConnection *connection);
#endif
extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes, int parameterCount, const Oid *parameterTypes,