Merge pull request #3659 from citusdata/refactor/queryString

refactor query string of task
pull/3681/head^2
SaitTalhaNisanci 2020-03-31 16:12:24 +03:00 committed by GitHub
commit e04a307a4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 274 additions and 108 deletions

View File

@ -3196,18 +3196,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
int querySent = 0;
char *queryString = NULL;
if (list_length(task->perPlacementQueryStrings) == 0)
{
queryString = TaskQueryString(task);
}
else
{
Assert(list_length(task->taskPlacementList) == list_length(
task->perPlacementQueryStrings));
queryString = list_nth(task->perPlacementQueryStrings,
placementExecution->placementExecutionIndex);
}
char *queryString = TaskQueryStringForPlacement(task,
placementExecution->
placementExecutionIndex);
if (execution->transactionProperties->useRemoteTransactionBlocks !=
TRANSACTION_BLOCKS_DISALLOWED)

View File

@ -221,16 +221,15 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
shardPlacement->nodeId,
quote_literal_cstr(taskPrefix),
quote_literal_cstr(taskPrefix),
quote_literal_cstr(TaskQueryString(selectTask)),
quote_literal_cstr(TaskQueryStringForAllPlacements(
selectTask)),
partitionColumnIndex,
quote_literal_cstr(partitionMethodString),
minValuesString->data, maxValuesString->data,
binaryFormatString);
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
}
SetTaskQueryString(selectTask, NULL);
selectTask->perPlacementQueryStrings = perPlacementQueries;
SetTaskPerPlacementQueryStrings(selectTask, perPlacementQueries);
}
}

View File

@ -1062,7 +1062,7 @@ IsRedistributablePlan(Plan *selectPlan)
/*
* WrapTaskListForProjection wraps task->queryString to only select given
* WrapTaskListForProjection wraps task query string to only select given
* projected columns. It modifies the taskList.
*/
static void
@ -1091,7 +1091,7 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
projectedColumnsString->data,
TaskQueryString(task));
TaskQueryStringForAllPlacements(task));
SetTaskQueryString(task, wrappedQuery->data);
}
}

View File

@ -201,7 +201,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
taskParameterTypes = NULL;
}
Query *shardQuery = ParseQueryString(TaskQueryString(task),
Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task),
taskParameterTypes,
taskNumParams);
@ -220,9 +220,16 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
LogLocalCommand(task);
char *shardQueryString = task->queryStringLazy
? task->queryStringLazy
: "<optimized out by local execution>";
char *shardQueryString = NULL;
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{
shardQueryString = TaskQueryStringForAllPlacements(task);
}
else
{
/* avoid the overhead of deparsing when using local execution */
shardQueryString = "<optimized out by local execution>";
}
totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString);
@ -302,7 +309,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList)
foreach_ptr(localTask, localTaskList)
{
const char *localTaskQueryCommand = TaskQueryString(localTask);
const char *localTaskQueryCommand = TaskQueryStringForAllPlacements(localTask);
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
@ -390,7 +397,7 @@ LogLocalCommand(Task *task)
}
ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(TaskQueryString(task)))));
ApplyLogRedaction(TaskQueryStringForAllPlacements(task)))));
}

View File

@ -1570,7 +1570,8 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
*/
StringInfo sqlTaskQueryString = makeStringInfo();
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task));
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryStringForAllPlacements(
task));
if (BinaryMasterCopyFormat)
{
@ -1605,7 +1606,8 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
HTAB *taskStateHash = taskTracker->taskStateHash;
/* wrap a task assignment query outside the original query */
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task));
StringInfo taskAssignmentQuery =
TaskAssignmentQuery(task, TaskQueryStringForAllPlacements(task));
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
task->taskId);
@ -2742,7 +2744,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
{
/* assign through task tracker to manage resource utilization */
StringInfo jobCleanupQuery = TaskAssignmentQuery(
jobCleanupTask, TaskQueryString(jobCleanupTask));
jobCleanupTask, TaskQueryStringForAllPlacements(jobCleanupTask));
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
jobCleanupQuery->data);
@ -2821,7 +2823,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodeName, nodePort, (int) queryStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, TaskQueryString(
nodePort, TaskQueryStringForAllPlacements(
jobCleanupTask))));
}
else
@ -2840,7 +2842,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodePort, (int) resultStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, TaskQueryString(jobCleanupTask))));
nodePort, TaskQueryStringForAllPlacements(
jobCleanupTask))));
}
else
{

View File

@ -447,7 +447,8 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
* connect to that node to drop the shard placement over that
* remote connection.
*/
const char *dropShardPlacementCommand = TaskQueryString(task);
const char *dropShardPlacementCommand = TaskQueryStringForAllPlacements(
task);
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
relationName,
dropShardPlacementCommand);

View File

@ -41,6 +41,7 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query);
static bool IsEachPlacementQueryStringDifferent(Task *task);
/*
@ -111,11 +112,13 @@ RebuildQueryStrings(Job *workerJob)
}
}
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
GetTaskQueryType(task) == TASK_QUERY_OBJECT;
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
task->queryForLocalExecution == NULL &&
task->queryStringLazy == NULL
!isQueryObjectOrText
? "(null)"
: ApplyLogRedaction(TaskQueryString(task)))));
: ApplyLogRedaction(TaskQueryStringForAllPlacements(
task)))));
UpdateTaskQueryString(query, relationId, valuesRTE, task);
@ -126,7 +129,8 @@ RebuildQueryStrings(Job *workerJob)
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(TaskQueryString(task)))));
ApplyLogRedaction(TaskQueryStringForAllPlacements(
task)))));
}
}
@ -181,7 +185,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
task->anchorDistributedTableId = distributedTableId;
}
SetTaskQuery(task, query);
SetTaskQueryIfShouldLazyDeparse(task, query);
if (valuesRTE != NULL)
{
@ -412,36 +416,47 @@ ShouldLazyDeparseQuery(Task *task)
/*
* SetTaskQuery attaches the query to the task so that it can be used during
* execution. If local execution can possibly take place it sets task->queryForLocalExecution.
* SetTaskQueryIfShouldLazyDeparse attaches the query to the task so that it can be used during
* execution. If local execution can possibly take place it sets task->jobQueryReferenceForLazyDeparsing.
* If not it deparses the query and sets queryStringLazy, to avoid blowing the
* size of the task unnecesarily.
*/
void
SetTaskQuery(Task *task, Query *query)
SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
{
if (ShouldLazyDeparseQuery(task))
{
task->queryForLocalExecution = query;
task->queryStringLazy = NULL;
task->taskQuery.queryType = TASK_QUERY_OBJECT;
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
return;
}
task->queryForLocalExecution = NULL;
task->queryStringLazy = DeparseTaskQuery(task, query);
SetTaskQueryString(task, DeparseTaskQuery(task, query));
}
/*
* SetTaskQueryString attaches the query string to the task so that it can be
* used during execution. It also unsets queryForLocalExecution to be sure
* used during execution. It also unsets jobQueryReferenceForLazyDeparsing to be sure
* these are kept in sync.
*/
void
SetTaskQueryString(Task *task, char *queryString)
{
task->queryForLocalExecution = NULL;
task->queryStringLazy = queryString;
task->taskQuery.queryType = TASK_QUERY_TEXT;
task->taskQuery.data.queryStringLazy = queryString;
}
/*
* SetTaskPerPlacementQueryStrings set the perPlacementQueryString for the given task.
*/
void
SetTaskPerPlacementQueryStrings(Task *task, List *perPlacementQueryStringList)
{
Assert(perPlacementQueryStringList != NIL);
task->taskQuery.queryType = TASK_QUERY_TEXT_PER_PLACEMENT;
task->taskQuery.data.perPlacementQueryStrings = perPlacementQueryStringList;
}
@ -451,8 +466,8 @@ SetTaskQueryString(Task *task, char *queryString)
void
SetTaskQueryStringList(Task *task, List *queryStringList)
{
task->queryStringList = queryStringList;
SetTaskQueryString(task, StringJoin(queryStringList, ';'));
task->taskQuery.queryType = TASK_QUERY_TEXT_LIST;
task->taskQuery.data.queryStringList = queryStringList;
}
@ -486,33 +501,80 @@ DeparseTaskQuery(Task *task, Query *query)
/*
* TaskQueryString generates task->queryStringLazy if missing.
* GetTaskQueryType returns the type of the task query.
*/
int
GetTaskQueryType(Task *task)
{
return task->taskQuery.queryType;
}
/*
* TaskQueryStringForAllPlacements generates task query string text if missing.
*
* For performance reasons, the queryString is generated lazily. For example
* for local queries it is usually not needed to generate it, so this way we
* can skip the expensive deparsing+parsing.
*/
char *
TaskQueryString(Task *task)
TaskQueryStringForAllPlacements(Task *task)
{
if (task->queryStringLazy != NULL)
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{
return task->queryStringLazy;
return StringJoin(task->taskQuery.data.queryStringList, ';');
}
Assert(task->queryForLocalExecution != NULL);
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
{
return task->taskQuery.data.queryStringLazy;
}
Query *jobQueryReferenceForLazyDeparsing =
task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
Assert(task->taskQuery.queryType == TASK_QUERY_OBJECT &&
jobQueryReferenceForLazyDeparsing != NULL);
/*
* Switch to the memory context of task->queryForLocalExecution before generating the query
* Switch to the memory context of task->jobQueryReferenceForLazyDeparsing before generating the query
* string. This way the query string is not freed in between multiple
* executions of a prepared statement. Except when UpdateTaskQueryString is
* used to set task->queryForLocalExecution, in that case it is freed but it will be set to
* used to set task->jobQueryReferenceForLazyDeparsing, in that case it is freed but it will be set to
* NULL on the next execution of the query because UpdateTaskQueryString
* does that.
*/
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
task->queryForLocalExecution));
task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution);
jobQueryReferenceForLazyDeparsing));
char *queryString = DeparseTaskQuery(task, jobQueryReferenceForLazyDeparsing);
MemoryContextSwitchTo(previousContext);
return task->queryStringLazy;
SetTaskQueryString(task, queryString);
return task->taskQuery.data.queryStringLazy;
}
/*
* TaskQueryStringForPlacement returns the query string that should be executed
* on the placement with the given placementIndex.
*/
char *
TaskQueryStringForPlacement(Task *task, int placementIndex)
{
if (IsEachPlacementQueryStringDifferent(task))
{
List *perPlacementQueryStringList =
task->taskQuery.data.perPlacementQueryStrings;
Assert(list_length(perPlacementQueryStringList) > placementIndex);
return list_nth(perPlacementQueryStringList, placementIndex);
}
return TaskQueryStringForAllPlacements(task);
}
/*
* IsEachPlacementQueryStringDifferent returns true if each placement
* has a different query string.
*/
static bool
IsEachPlacementQueryStringDifferent(Task *task)
{
return GetTaskQueryType(task) == TASK_QUERY_TEXT_PER_PLACEMENT;
}

View File

@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
task = CitusMakeNode(Task);
task->taskType = SELECT_TASK;
task->taskPlacementList = placementList;
SetTaskQuery(task, planContext->query);
SetTaskQueryIfShouldLazyDeparse(task, planContext->query);
task->anchorShardId = shardInterval->shardId;
task->replicationModel = distTable->replicationModel;

View File

@ -399,7 +399,9 @@ RemoteExplain(Task *task, ExplainState *es)
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan));
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es);
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(
task),
es);
/*
* Use a coordinated transaction to ensure that we open a transaction block

View File

@ -4388,7 +4388,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
/* wrap repartition query string around filter query string */
StringInfo mapQueryString = makeStringInfo();
char *filterQueryString = TaskQueryString(filterTask);
char *filterQueryString = TaskQueryStringForAllPlacements(filterTask);
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
PartitionType partitionType = mapMergeJob->partitionType;

View File

@ -1820,7 +1820,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
* that the query cannot be executed locally.
*/
task->taskPlacementList = placementList;
SetTaskQuery(task, query);
SetTaskQueryIfShouldLazyDeparse(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;
task->relationShardList = relationShardList;
@ -1901,7 +1901,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
}
task->taskPlacementList = placementList;
SetTaskQuery(task, query);
SetTaskQueryIfShouldLazyDeparse(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;
task->relationShardList = relationShardList;

View File

@ -73,6 +73,7 @@ CitusSetTag(Node *node, int tag)
} \
while (0)
static void CopyTaskQuery(Task *newnode, Task *from);
static void
copyJobInfo(Job *newnode, Job *from)
@ -250,6 +251,44 @@ CopyNodeRelationRowLock(COPYFUNC_ARGS)
}
static void
CopyTaskQuery(Task *newnode, Task *from)
{
COPY_SCALAR_FIELD(taskQuery.queryType);
switch (from->taskQuery.queryType)
{
case TASK_QUERY_TEXT:
{
COPY_STRING_FIELD(taskQuery.data.queryStringLazy);
break;
}
case TASK_QUERY_OBJECT:
{
COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
break;
}
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST:
{
COPY_NODE_FIELD(taskQuery.data.queryStringList);
break;
}
default:
{
break;
}
}
}
void
CopyNodeTask(COPYFUNC_ARGS)
{
@ -258,10 +297,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(taskType);
COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId);
COPY_NODE_FIELD(queryForLocalExecution);
COPY_STRING_FIELD(queryStringLazy);
COPY_NODE_FIELD(perPlacementQueryStrings);
COPY_NODE_FIELD(queryStringList);
CopyTaskQuery(newnode, from);
COPY_SCALAR_FIELD(anchorDistributedTableId);
COPY_SCALAR_FIELD(anchorShardId);
COPY_NODE_FIELD(taskPlacementList);

View File

@ -135,7 +135,7 @@
#define booltostr(x) ((x) ? "true" : "false")
static void WriteTaskQuery(OUTFUNC_ARGS);
/*****************************************************************************
* Output routines for Citus node types
@ -469,6 +469,43 @@ OutRelationRowLock(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(rowLockStrength, LockClauseStrength);
}
static void WriteTaskQuery(OUTFUNC_ARGS) {
WRITE_LOCALS(Task);
WRITE_ENUM_FIELD(taskQuery.queryType, TaskQueryType);
switch (node->taskQuery.queryType)
{
case TASK_QUERY_TEXT:
{
WRITE_STRING_FIELD(taskQuery.data.queryStringLazy);
break;
}
case TASK_QUERY_OBJECT:
{
WRITE_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
break;
}
case TASK_QUERY_TEXT_PER_PLACEMENT:
{
WRITE_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
break;
}
case TASK_QUERY_TEXT_LIST:
{
WRITE_NODE_FIELD(taskQuery.data.queryStringList);
break;
}
default:
{
break;
}
}
}
void
OutTask(OUTFUNC_ARGS)
@ -479,10 +516,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(taskType, TaskType);
WRITE_UINT64_FIELD(jobId);
WRITE_UINT_FIELD(taskId);
WRITE_NODE_FIELD(queryForLocalExecution);
WRITE_STRING_FIELD(queryStringLazy);
WRITE_NODE_FIELD(perPlacementQueryStrings);
WRITE_NODE_FIELD(queryStringList);
WriteTaskQuery(str, raw_node);
WRITE_OID_FIELD(anchorDistributedTableId);
WRITE_UINT64_FIELD(anchorShardId);
WRITE_NODE_FIELD(taskPlacementList);

View File

@ -76,6 +76,7 @@ extern void CopyNodeRelationShard(COPYFUNC_ARGS);
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
extern void CopyNodeTask(COPYFUNC_ARGS);
extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS);
extern void CopyNodeTaskQuery(COPYFUNC_ARGS);
extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);

View File

@ -23,10 +23,14 @@
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQuery(Task *task, Query *query);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern char * TaskQueryString(Task *task);
extern void SetTaskPerPlacementQueryStrings(Task *task,
List *perPlacementQueryStringList);
extern char * TaskQueryStringForAllPlacements(Task *task);
extern char * TaskQueryStringForPlacement(Task *task, int placementIndex);
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
extern int GetTaskQueryType(Task *task);
#endif /* DEPARSE_SHARD_QUERY_H */

View File

@ -203,15 +203,22 @@ typedef struct MapMergeJob
*/
typedef struct TaskExecution TaskExecution;
typedef struct Task
typedef enum TaskQueryType
{
CitusNode type;
TaskType taskType;
uint64 jobId;
uint32 taskId;
TASK_QUERY_TEXT,
TASK_QUERY_OBJECT,
TASK_QUERY_TEXT_PER_PLACEMENT,
TASK_QUERY_TEXT_LIST
} TaskQueryType;
typedef struct TaskQuery
{
TaskQueryType queryType;
union
{
/*
* For most queries queryForLocalExecution and/or queryStringLazy is not
* For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not
* NULL. This means we have a single query for all placements.
*
* If this is not the case, the length of perPlacementQueryStrings is
@ -220,24 +227,28 @@ typedef struct Task
* when a query should return node specific values. For example, on which
* node did we succeed storing some result files?
*
* queryForLocalExecution is only not null when the planner thinks the
* jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the
* query could possibly be locally executed. In that case deparsing+parsing
* the query might not be necessary, so we do that lazily.
*
* queryForLocalExecution should only be set by using SetTaskQuery()
* jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse()
*/
Query *queryForLocalExecution;
Query *jobQueryReferenceForLazyDeparsing;
/*
* In almost all cases queryStringLazy should be read only indirectly by
* using TaskQueryString(). This will populate the field if only the
* queryForLocalExecution field is not NULL.
* using TaskQueryStringForAllPlacements(). This will populate the field if only the
* jobQueryReferenceForLazyDeparsing field is not NULL.
*
* This field should only be set by using SetTaskQueryString() (or as a
* side effect from TaskQueryString()). Otherwise it might not be in sync
* with queryForLocalExecution.
* side effect from TaskQueryStringForAllPlacements()). Otherwise it might not be in sync
* with jobQueryReferenceForLazyDeparsing.
*/
char *queryStringLazy;
/*
* perPlacementQueryStrings is used when we have different query strings for each placement.
*/
List *perPlacementQueryStrings;
/*
@ -247,6 +258,21 @@ typedef struct Task
* when we want to access each query string.
*/
List *queryStringList;
}data;
}TaskQuery;
typedef struct Task
{
CitusNode type;
TaskType taskType;
uint64 jobId;
uint32 taskId;
/*
* taskQuery contains query string information. The way we get queryString can be different
* so this is abstracted with taskQuery.
*/
TaskQuery taskQuery;
Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */