From a59bf31c7632a28e6d7106c1c2eb082df19445e0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 22 Nov 2018 01:10:37 +0100 Subject: [PATCH] Use worker_execute_sql_task UDF in task-tracker executor --- .../executor/multi_task_tracker_executor.c | 22 +++++++++---------- .../distributed/multi_server_executor.h | 6 +++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 9bf24f6b3..724359b57 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -1622,28 +1622,28 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) StringInfo taskAssignmentQuery = NULL; /* - * We first wrap a copy out command around the original query string. This - * allows for the query's results to persist on the worker node after the - * query completes and for the executor to later use this persisted data. + * We first wrap the original query string in a worker_execute_sql_task + * call. This allows for the query's results to persist on the worker node + * after the query completes and for the executor to later fetch this + * persisted data using COPY ... (format 'transmit') */ - StringInfo jobDirectoryName = JobDirectoryName(task->jobId); - StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId); - StringInfo copyQueryString = makeStringInfo(); + StringInfo sqlTaskQueryString = makeStringInfo(); + char *escapedTaskQueryString = quote_literal_cstr(task->queryString); if (BinaryMasterCopyFormat) { - appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_BINARY, - task->queryString, taskFilename->data); + appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_BINARY, + task->jobId, task->taskId, escapedTaskQueryString); } else { - appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_TEXT, - task->queryString, taskFilename->data); + appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_TEXT, + task->jobId, task->taskId, escapedTaskQueryString); } /* wrap a task assignment query outside the copy out query */ - taskAssignmentQuery = TaskAssignmentQuery(task, copyQueryString->data); + taskAssignmentQuery = TaskAssignmentQuery(task, sqlTaskQueryString->data); taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); taskState->status = TASK_CLIENT_SIDE_QUEUED; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 496149f4f..70b40aa83 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -26,8 +26,10 @@ /* copy out query results */ #define COPY_QUERY_TO_STDOUT_TEXT "COPY (%s) TO STDOUT" #define COPY_QUERY_TO_STDOUT_BINARY "COPY (%s) TO STDOUT WITH (FORMAT binary)" -#define COPY_QUERY_TO_FILE_TEXT "COPY (%s) TO '%s'" -#define COPY_QUERY_TO_FILE_BINARY "COPY (%s) TO '%s' WITH (FORMAT binary)" +#define EXECUTE_SQL_TASK_TO_FILE_BINARY \ + "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)" +#define EXECUTE_SQL_TASK_TO_FILE_TEXT \ + "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)" /* Task tracker executor related defines */ #define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \