From a2f2107e6a7bdb74a8570aeeb3f6e1e92fec94dd Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 13 Dec 2019 22:41:49 +0300 Subject: [PATCH] refactor MapTaskList in multi physical planner (#3297) --- .../planner/multi_physical_planner.c | 111 +++++++++--------- 1 file changed, 58 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 375f94945..014c164a5 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -194,6 +194,8 @@ static List * AssignDualHashTaskList(List *taskList); static void AssignDataFetchDependencies(List *taskList); static uint32 TaskListHighestTaskId(List *taskList); static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList); +static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, + char *partitionColumnName); static char * ColumnName(Var *column, List *rangeTableList); static StringInfo SplitPointArrayString(ArrayType *splitPointObject, Oid columnType, int32 columnTypeMod); @@ -4171,9 +4173,6 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) List *rangeTableList = filterQuery->rtable; ListCell *filterTaskCell = NULL; Var *partitionColumn = mapMergeJob->partitionColumn; - Oid partitionColumnType = partitionColumn->vartype; - char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType); - int32 partitionColumnTypeMod = partitionColumn->vartypmod; char *partitionColumnName = NULL; List *groupClauseList = filterQuery->groupClause; @@ -4194,56 +4193,8 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) foreach(filterTaskCell, filterTaskList) { Task *filterTask = (Task *) lfirst(filterTaskCell); - uint64 jobId = filterTask->jobId; - uint32 taskId = filterTask->taskId; - - /* wrap repartition query string around filter query string */ - StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = filterTask->queryString; - char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); - - PartitionType partitionType = mapMergeJob->partitionType; - if (partitionType == RANGE_PARTITION_TYPE) - { - ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; - uint32 intervalCount = mapMergeJob->partitionCount; - - ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); - StringInfo splitPointString = SplitPointArrayString(splitPointObject, - partitionColumnType, - partitionColumnTypeMod); - - appendStringInfo(mapQueryString, RANGE_PARTITION_COMMAND, jobId, taskId, - filterQueryEscapedText, partitionColumnName, - partitionColumnTypeFullName, splitPointString->data); - } - else if (partitionType == SINGLE_HASH_PARTITION_TYPE) - { - ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; - uint32 intervalCount = mapMergeJob->partitionCount; - - ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); - StringInfo splitPointString = SplitPointArrayString(splitPointObject, - partitionColumnType, - partitionColumnTypeMod); - appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, - filterQueryEscapedText, partitionColumnName, - partitionColumnTypeFullName, splitPointString->data); - } - else - { - uint32 partitionCount = mapMergeJob->partitionCount; - ShardInterval **intervalArray = - GenerateSyntheticShardIntervalArray(partitionCount); - ArrayType *splitPointObject = SplitPointObject(intervalArray, - mapMergeJob->partitionCount); - StringInfo splitPointString = - SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID)); - - appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, - filterQueryEscapedText, partitionColumnName, - partitionColumnTypeFullName, splitPointString->data); - } + StringInfo mapQueryString = CreateMapQueryString(mapMergeJob, filterTask, + partitionColumnName); /* convert filter query task into map task */ Task *mapTask = filterTask; @@ -4257,6 +4208,60 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) } +/* + * CreateMapQueryString creates and returns the map query string for the given filterTask. + */ +static StringInfo +CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, + char *partitionColumnName) +{ + uint64 jobId = filterTask->jobId; + uint32 taskId = filterTask->taskId; + + /* wrap repartition query string around filter query string */ + StringInfo mapQueryString = makeStringInfo(); + char *filterQueryString = filterTask->queryString; + char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); + PartitionType partitionType = mapMergeJob->partitionType; + + Var *partitionColumn = mapMergeJob->partitionColumn; + Oid partitionColumnType = partitionColumn->vartype; + char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType); + int32 partitionColumnTypeMod = partitionColumn->vartypmod; + + ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; + uint32 intervalCount = mapMergeJob->partitionCount; + + if (partitionType != SINGLE_HASH_PARTITION_TYPE && partitionType != + RANGE_PARTITION_TYPE) + { + partitionColumnType = INT4OID; + partitionColumnTypeMod = get_typmodin(INT4OID); + intervalArray = GenerateSyntheticShardIntervalArray(intervalCount); + } + + ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); + StringInfo splitPointString = SplitPointArrayString(splitPointObject, + partitionColumnType, + partitionColumnTypeMod); + + char *partitionCommand = NULL; + if (partitionType == RANGE_PARTITION_TYPE) + { + partitionCommand = RANGE_PARTITION_COMMAND; + } + else + { + partitionCommand = HASH_PARTITION_COMMAND; + } + + appendStringInfo(mapQueryString, partitionCommand, jobId, taskId, + filterQueryEscapedText, partitionColumnName, + partitionColumnTypeFullName, splitPointString->data); + return mapQueryString; +} + + /* * GenerateSyntheticShardIntervalArray returns a shard interval pointer array * which has a uniform hash distribution for the given input partitionCount.