diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 1dfd51781..3271d158f 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -573,6 +573,178 @@ SendRemoteCommand(MultiConnection *connection, const char *command) } +/* + * EnterRemotePipelineMode puts connection into pipeline mode. + */ +int +EnterRemotePipelineMode(MultiConnection *connection) +{ + PGconn *pgConn = connection->pgConn; + + /* + * Don't try to send command if connection is entirely gone + * (PQisnonblocking() would crash). + */ + if (!pgConn || PQstatus(pgConn) != CONNECTION_OK) + { + return 0; + } + + Assert(PQisnonblocking(pgConn)); + Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_OFF); + + int rc = PQenterPipelineMode(pgConn); + + return rc; +} + + +/* + * SyncRemotePipelineMode syncs the pipeline in the connection. + */ +int +SyncRemotePipelineMode(MultiConnection *connection) +{ + PGconn *pgConn = connection->pgConn; + + /* + * Don't try to send command if connection is entirely gone + * (PQisnonblocking() would crash). + */ + if (!pgConn || PQstatus(pgConn) != CONNECTION_OK) + { + return 0; + } + + Assert(PQisnonblocking(pgConn)); + Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_ON); + + int rc = PQpipelineSync(pgConn); + + return rc; +} + + +/* + * ExitRemotePipelineMode puts connection out of pipeline mode. + */ +int +ExitRemotePipelineMode(MultiConnection *connection) +{ + PGconn *pgConn = connection->pgConn; + + /* + * Don't try to send command if connection is entirely gone + * (PQisnonblocking() would crash). + */ + if (!pgConn || PQstatus(pgConn) != CONNECTION_OK) + { + return 0; + } + + Assert(PQisnonblocking(pgConn)); + Assert(PQpipelineStatus(pgConn) == PQ_PIPELINE_ON); + + int rc = PQexitPipelineMode(pgConn); + + return rc; +} + + +#if PG_VERSION_NUM >= PG_VERSION_14 + +/* + * ExecuteRemoteCommandsInConnectionsInPipelineMode executes commands in pipeline mode + * for given connections. + */ +void +ExecuteRemoteCommandsInConnectionsInPipelineMode(List *connections, List *commands) +{ + MultiConnection *connection = NULL; + foreach_ptr(connection, connections) + { + ExecuteRemoteCommandsInPipelineMode(connection, commands); + } +} + + +/* + * ExecuteRemoteCommandsInPipelineMode executes commands in pipeline mode for given + * connection. + */ +void +ExecuteRemoteCommandsInPipelineMode(MultiConnection *connection, List *commands) +{ + /* PQtrace(connection->pgConn, stdout); */ + /* PQsetTraceFlags(connection->pgConn, */ + /* PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE); */ + + /* start pipeline mode */ + int rc = EnterRemotePipelineMode(connection); + if (rc != 1) + { + ReportConnectionError(connection, ERROR); + } + Assert(PQpipelineStatus(connection->pgConn) == PQ_PIPELINE_ON); + + /* send commands */ + char *command = NULL; + foreach_ptr(command, commands) + { + SendRemoteCommandParams(connection, command, 0, NULL, NULL, 0); + } + + /* create sync point in pipeline which would flush send buffer */ + rc = SyncRemotePipelineMode(connection); + if (rc != 1) + { + ReportConnectionError(connection, ERROR); + } + + /* start processing the results */ + bool raiseInterrupts = true; + PGresult *result = NULL; + int i = 0; + for (i = 0; i < list_length(commands); i++) + { + /* process i th command */ + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); + + /* marks end of i th command */ + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (result != NULL) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); + } + + /* sync response */ + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (PQresultStatus(result) != PGRES_PIPELINE_SYNC) + { + ReportResultError(connection, result, ERROR); + } + PQclear(result); + + /* exit pipeline mode */ + rc = ExitRemotePipelineMode(connection); + if (rc != 1) + { + ReportConnectionError(connection, ERROR); + } + Assert(PQpipelineStatus(connection->pgConn) == PQ_PIPELINE_OFF); +} + + +#endif + + /* * ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and * checks if the result is equal to the expected result. If the result is equal to the diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 8d4a02b6d..3fc0116a2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -150,7 +150,6 @@ static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); static bool ExceedsSyncBatchCount(MetadataSyncContext *context); -static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); @@ -4071,35 +4070,6 @@ ExceedsSyncBatchCount(MetadataSyncContext *context) } -/* - * ExecuteCommandListOnWorkerList executes a command list on all nodes - * in the given list. - */ -static void -ExecuteCommandListOnWorkerList(List *commandList, List *workerList) -{ - List *taskList = NIL; - - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerList) - { - ShardPlacement *placement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(placement, workerNode); - - Task *task = CitusMakeNode(Task); - task->taskType = DDL_TASK; - SetTaskQueryStringList(task, commandList); - task->taskPlacementList = list_make1(placement); - - taskList = lappend(taskList, task); - } - - bool localExecutionSupported = false; - - ExecuteUtilityTaskList(taskList, localExecutionSupported); -} - - /* * ProcessBatchCommandsToActivatedNodes processes collected commands inside * metadataSyncContext. Sends commands to activated workers if batch limit @@ -4124,7 +4094,7 @@ ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool forceSen /* - * ProcessBatchCommandsToSingleNode does the same operation as + * ProcessBatchCommandsToMetadataNodes does the same operation as * ProcessBatchCommandsToActivatedNodes but for metadata nodes. */ void @@ -4223,16 +4193,18 @@ SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context) if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *workerNodes = context->activatedWorkerNodeList; - ExecuteCommandListOnWorkerList(commands, workerNodes); - - /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, */ - /* CurrentUserName(), */ - /* commands); */ + SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, + CurrentUserName(), + commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { - List *workerConnections = context->activatedWorkerBareConnections; - SendCommandListToWorkerListWithBareConnections(workerConnections, commands); + List *connections = context->activatedWorkerBareConnections; + #if PG_VERSION_NUM >= PG_VERSION_14 + ExecuteRemoteCommandsInConnectionsInPipelineMode(connections, commands); + #else + SendCommandListToWorkerListWithBareConnections(connections, commands); + #endif } else { @@ -4266,11 +4238,9 @@ SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context) { List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, RowShareLock); - ExecuteCommandListOnWorkerList(commands, metadataNodes); - - /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, */ - /* CurrentUserName(), */ - /* commands); */ + SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, + CurrentUserName(), + commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { @@ -4310,11 +4280,9 @@ SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx) Assert(nodeIdx < list_length(workerNodes)); WorkerNode *node = list_nth(workerNodes, nodeIdx); - ExecuteCommandListOnWorkerList(commands, list_make1(node)); - - /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), */ - /* CurrentUserName(), */ - /* commands); */ + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), + CurrentUserName(), + commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 71cb9dad2..9b4269e64 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -53,6 +53,15 @@ extern bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, PGresult **result); +#if PG_VERSION_NUM >= PG_VERSION_14 +extern void ExecuteRemoteCommandsInConnectionsInPipelineMode(List *connections, + List *commands); +extern void ExecuteRemoteCommandsInPipelineMode(MultiConnection *connection, + List *commands); +extern int EnterRemotePipelineMode(MultiConnection *connection); +extern int SyncRemotePipelineMode(MultiConnection *connection); +extern int ExitRemotePipelineMode(MultiConnection *connection); +#endif extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, int parameterCount, const Oid *parameterTypes,