mirror of https://github.com/citusdata/citus.git
store query command list in task (#3649)
Sometimes we have concatenated query strings for a task. However, when we want to find each query string, it is not a trivial task. Therefore, it makes sense to store this in task so that when we need each query string we can easily get it.pull/3651/head
parent
4686133bf2
commit
dd1a456407
|
@ -927,7 +927,6 @@ List *
|
||||||
NodeDDLTaskList(TargetWorkerSet targets, List *commands)
|
NodeDDLTaskList(TargetWorkerSet targets, List *commands)
|
||||||
{
|
{
|
||||||
List *workerNodes = TargetWorkerSetNodeList(targets, NoLock);
|
List *workerNodes = TargetWorkerSetNodeList(targets, NoLock);
|
||||||
char *concatenatedCommands = StringJoin(commands, ';');
|
|
||||||
|
|
||||||
if (list_length(workerNodes) <= 0)
|
if (list_length(workerNodes) <= 0)
|
||||||
{
|
{
|
||||||
|
@ -940,7 +939,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
|
||||||
|
|
||||||
Task *task = CitusMakeNode(Task);
|
Task *task = CitusMakeNode(Task);
|
||||||
task->taskType = DDL_TASK;
|
task->taskType = DDL_TASK;
|
||||||
SetTaskQueryString(task, concatenatedCommands);
|
SetTaskQueryStringList(task, commands);
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodes)
|
foreach_ptr(workerNode, workerNodes)
|
||||||
|
|
|
@ -513,7 +513,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
||||||
task->jobId = INVALID_JOB_ID;
|
task->jobId = INVALID_JOB_ID;
|
||||||
task->taskId = taskId++;
|
task->taskId = taskId++;
|
||||||
task->taskType = DDL_TASK;
|
task->taskType = DDL_TASK;
|
||||||
SetTaskQueryString(task, StringJoin(commandList, ';'));
|
SetTaskQueryStringList(task, commandList);
|
||||||
|
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->dependentTaskList = NIL;
|
task->dependentTaskList = NIL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
|
|
|
@ -445,6 +445,17 @@ SetTaskQueryString(Task *task, char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetTaskQueryStringList sets the queryStringList of the given task.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SetTaskQueryStringList(Task *task, List *queryStringList)
|
||||||
|
{
|
||||||
|
task->queryStringList = queryStringList;
|
||||||
|
SetTaskQueryString(task, StringJoin(queryStringList, ';'));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeparseTaskQuery is a general way of deparsing a query based on a task.
|
* DeparseTaskQuery is a general way of deparsing a query based on a task.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -260,6 +260,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(taskId);
|
COPY_SCALAR_FIELD(taskId);
|
||||||
COPY_NODE_FIELD(queryForLocalExecution);
|
COPY_NODE_FIELD(queryForLocalExecution);
|
||||||
COPY_STRING_FIELD(queryStringLazy);
|
COPY_STRING_FIELD(queryStringLazy);
|
||||||
|
COPY_NODE_FIELD(queryStringList);
|
||||||
COPY_SCALAR_FIELD(anchorDistributedTableId);
|
COPY_SCALAR_FIELD(anchorDistributedTableId);
|
||||||
COPY_SCALAR_FIELD(anchorShardId);
|
COPY_SCALAR_FIELD(anchorShardId);
|
||||||
COPY_NODE_FIELD(taskPlacementList);
|
COPY_NODE_FIELD(taskPlacementList);
|
||||||
|
|
|
@ -481,6 +481,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_UINT_FIELD(taskId);
|
WRITE_UINT_FIELD(taskId);
|
||||||
WRITE_NODE_FIELD(queryForLocalExecution);
|
WRITE_NODE_FIELD(queryForLocalExecution);
|
||||||
WRITE_STRING_FIELD(queryStringLazy);
|
WRITE_STRING_FIELD(queryStringLazy);
|
||||||
|
WRITE_NODE_FIELD(queryStringList);
|
||||||
WRITE_OID_FIELD(anchorDistributedTableId);
|
WRITE_OID_FIELD(anchorDistributedTableId);
|
||||||
WRITE_UINT64_FIELD(anchorShardId);
|
WRITE_UINT64_FIELD(anchorShardId);
|
||||||
WRITE_NODE_FIELD(taskPlacementList);
|
WRITE_NODE_FIELD(taskPlacementList);
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern void RebuildQueryStrings(Job *workerJob);
|
||||||
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
extern void SetTaskQuery(Task *task, Query *query);
|
extern void SetTaskQuery(Task *task, Query *query);
|
||||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||||
|
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||||
extern char * TaskQueryString(Task *task);
|
extern char * TaskQueryString(Task *task);
|
||||||
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
|
||||||
|
|
||||||
|
|
|
@ -240,6 +240,14 @@ typedef struct Task
|
||||||
char *queryStringLazy;
|
char *queryStringLazy;
|
||||||
List *perPlacementQueryStrings;
|
List *perPlacementQueryStrings;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* queryStringList contains query strings. They should be
|
||||||
|
* run sequentially. The concatenated version of this list
|
||||||
|
* will already be set for queryStringLazy, this can be useful
|
||||||
|
* when we want to access each query string.
|
||||||
|
*/
|
||||||
|
List *queryStringList;
|
||||||
|
|
||||||
Oid anchorDistributedTableId; /* only applies to insert tasks */
|
Oid anchorDistributedTableId; /* only applies to insert tasks */
|
||||||
uint64 anchorShardId; /* only applies to compute tasks */
|
uint64 anchorShardId; /* only applies to compute tasks */
|
||||||
List *taskPlacementList; /* only applies to compute tasks */
|
List *taskPlacementList; /* only applies to compute tasks */
|
||||||
|
|
Loading…
Reference in New Issue