From dd1a456407cfb42aff07179f3fad7ea1d629834c Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 26 Mar 2020 12:04:08 +0300 Subject: [PATCH] store query command list in task (#3649) Sometimes we have concatenated query strings for a task. However, when we want to find each query string, it is not a trivial task. Therefore, it makes sense to store this in task so that when we need each query string we can easily get it. --- src/backend/distributed/commands/utility_hook.c | 3 +-- .../distributed/master/master_stage_protocol.c | 3 ++- src/backend/distributed/planner/deparse_shard_query.c | 11 +++++++++++ src/backend/distributed/utils/citus_copyfuncs.c | 1 + src/backend/distributed/utils/citus_outfuncs.c | 1 + src/include/distributed/deparse_shard_query.h | 1 + src/include/distributed/multi_physical_planner.h | 8 ++++++++ 7 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 976f24cb7..40a9e7770 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -927,7 +927,6 @@ List * NodeDDLTaskList(TargetWorkerSet targets, List *commands) { List *workerNodes = TargetWorkerSetNodeList(targets, NoLock); - char *concatenatedCommands = StringJoin(commands, ';'); if (list_length(workerNodes) <= 0) { @@ -940,7 +939,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) Task *task = CitusMakeNode(Task); task->taskType = DDL_TASK; - SetTaskQueryString(task, concatenatedCommands); + SetTaskQueryStringList(task, commands); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodes) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index c58573ae0..34b6c4ad8 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -513,7 +513,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, task->jobId = INVALID_JOB_ID; task->taskId = taskId++; task->taskType = DDL_TASK; - SetTaskQueryString(task, StringJoin(commandList, ';')); + SetTaskQueryStringList(task, commandList); + task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NIL; task->anchorShardId = shardId; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 3d76f25c0..3651da73a 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -445,6 +445,17 @@ SetTaskQueryString(Task *task, char *queryString) } +/* + * SetTaskQueryStringList sets the queryStringList of the given task. + */ +void +SetTaskQueryStringList(Task *task, List *queryStringList) +{ + task->queryStringList = queryStringList; + SetTaskQueryString(task, StringJoin(queryStringList, ';')); +} + + /* * DeparseTaskQuery is a general way of deparsing a query based on a task. */ diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 26ede196e..524d85006 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -260,6 +260,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskId); COPY_NODE_FIELD(queryForLocalExecution); COPY_STRING_FIELD(queryStringLazy); + COPY_NODE_FIELD(queryStringList); COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 16e376cf0..4d35c0b2a 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -481,6 +481,7 @@ OutTask(OUTFUNC_ARGS) WRITE_UINT_FIELD(taskId); WRITE_NODE_FIELD(queryForLocalExecution); WRITE_STRING_FIELD(queryStringLazy); + WRITE_NODE_FIELD(queryStringList); WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 331903fa6..dc340821b 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -25,6 +25,7 @@ extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQuery(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); +extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern char * TaskQueryString(Task *task); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e8db507d9..84112d93b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -240,6 +240,14 @@ typedef struct Task char *queryStringLazy; List *perPlacementQueryStrings; + /* + * queryStringList contains query strings. They should be + * run sequentially. The concatenated version of this list + * will already be set for queryStringLazy, this can be useful + * when we want to access each query string. + */ + List *queryStringList; + Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */