mirror of https://github.com/citusdata/citus.git
Use worker_execute_sql_task UDF in task-tracker executor
parent
30bad7e66f
commit
a59bf31c76
|
@ -1622,28 +1622,28 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
|
||||||
StringInfo taskAssignmentQuery = NULL;
|
StringInfo taskAssignmentQuery = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We first wrap a copy out command around the original query string. This
|
* We first wrap the original query string in a worker_execute_sql_task
|
||||||
* allows for the query's results to persist on the worker node after the
|
* call. This allows for the query's results to persist on the worker node
|
||||||
* query completes and for the executor to later use this persisted data.
|
* after the query completes and for the executor to later fetch this
|
||||||
|
* persisted data using COPY ... (format 'transmit')
|
||||||
*/
|
*/
|
||||||
StringInfo jobDirectoryName = JobDirectoryName(task->jobId);
|
|
||||||
StringInfo taskFilename = TaskFilename(jobDirectoryName, task->taskId);
|
|
||||||
|
|
||||||
StringInfo copyQueryString = makeStringInfo();
|
StringInfo sqlTaskQueryString = makeStringInfo();
|
||||||
|
char *escapedTaskQueryString = quote_literal_cstr(task->queryString);
|
||||||
|
|
||||||
if (BinaryMasterCopyFormat)
|
if (BinaryMasterCopyFormat)
|
||||||
{
|
{
|
||||||
appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_BINARY,
|
appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_BINARY,
|
||||||
task->queryString, taskFilename->data);
|
task->jobId, task->taskId, escapedTaskQueryString);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
appendStringInfo(copyQueryString, COPY_QUERY_TO_FILE_TEXT,
|
appendStringInfo(sqlTaskQueryString, EXECUTE_SQL_TASK_TO_FILE_TEXT,
|
||||||
task->queryString, taskFilename->data);
|
task->jobId, task->taskId, escapedTaskQueryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wrap a task assignment query outside the copy out query */
|
/* wrap a task assignment query outside the copy out query */
|
||||||
taskAssignmentQuery = TaskAssignmentQuery(task, copyQueryString->data);
|
taskAssignmentQuery = TaskAssignmentQuery(task, sqlTaskQueryString->data);
|
||||||
|
|
||||||
taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId);
|
taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId);
|
||||||
taskState->status = TASK_CLIENT_SIDE_QUEUED;
|
taskState->status = TASK_CLIENT_SIDE_QUEUED;
|
||||||
|
|
|
@ -26,8 +26,10 @@
|
||||||
/* copy out query results */
|
/* copy out query results */
|
||||||
#define COPY_QUERY_TO_STDOUT_TEXT "COPY (%s) TO STDOUT"
|
#define COPY_QUERY_TO_STDOUT_TEXT "COPY (%s) TO STDOUT"
|
||||||
#define COPY_QUERY_TO_STDOUT_BINARY "COPY (%s) TO STDOUT WITH (FORMAT binary)"
|
#define COPY_QUERY_TO_STDOUT_BINARY "COPY (%s) TO STDOUT WITH (FORMAT binary)"
|
||||||
#define COPY_QUERY_TO_FILE_TEXT "COPY (%s) TO '%s'"
|
#define EXECUTE_SQL_TASK_TO_FILE_BINARY \
|
||||||
#define COPY_QUERY_TO_FILE_BINARY "COPY (%s) TO '%s' WITH (FORMAT binary)"
|
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)"
|
||||||
|
#define EXECUTE_SQL_TASK_TO_FILE_TEXT \
|
||||||
|
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)"
|
||||||
|
|
||||||
/* Task tracker executor related defines */
|
/* Task tracker executor related defines */
|
||||||
#define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \
|
#define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \
|
||||||
|
|
Loading…
Reference in New Issue