mirror of https://github.com/citusdata/citus.git
refactor MapTaskList in multi physical planner (#3297)
parent
3b6b3f8c48
commit
a2f2107e6a
|
@ -194,6 +194,8 @@ static List * AssignDualHashTaskList(List *taskList);
|
||||||
static void AssignDataFetchDependencies(List *taskList);
|
static void AssignDataFetchDependencies(List *taskList);
|
||||||
static uint32 TaskListHighestTaskId(List *taskList);
|
static uint32 TaskListHighestTaskId(List *taskList);
|
||||||
static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList);
|
static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList);
|
||||||
|
static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
|
||||||
|
char *partitionColumnName);
|
||||||
static char * ColumnName(Var *column, List *rangeTableList);
|
static char * ColumnName(Var *column, List *rangeTableList);
|
||||||
static StringInfo SplitPointArrayString(ArrayType *splitPointObject,
|
static StringInfo SplitPointArrayString(ArrayType *splitPointObject,
|
||||||
Oid columnType, int32 columnTypeMod);
|
Oid columnType, int32 columnTypeMod);
|
||||||
|
@ -4171,9 +4173,6 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
|
||||||
List *rangeTableList = filterQuery->rtable;
|
List *rangeTableList = filterQuery->rtable;
|
||||||
ListCell *filterTaskCell = NULL;
|
ListCell *filterTaskCell = NULL;
|
||||||
Var *partitionColumn = mapMergeJob->partitionColumn;
|
Var *partitionColumn = mapMergeJob->partitionColumn;
|
||||||
Oid partitionColumnType = partitionColumn->vartype;
|
|
||||||
char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType);
|
|
||||||
int32 partitionColumnTypeMod = partitionColumn->vartypmod;
|
|
||||||
char *partitionColumnName = NULL;
|
char *partitionColumnName = NULL;
|
||||||
|
|
||||||
List *groupClauseList = filterQuery->groupClause;
|
List *groupClauseList = filterQuery->groupClause;
|
||||||
|
@ -4194,56 +4193,8 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
|
||||||
foreach(filterTaskCell, filterTaskList)
|
foreach(filterTaskCell, filterTaskList)
|
||||||
{
|
{
|
||||||
Task *filterTask = (Task *) lfirst(filterTaskCell);
|
Task *filterTask = (Task *) lfirst(filterTaskCell);
|
||||||
uint64 jobId = filterTask->jobId;
|
StringInfo mapQueryString = CreateMapQueryString(mapMergeJob, filterTask,
|
||||||
uint32 taskId = filterTask->taskId;
|
partitionColumnName);
|
||||||
|
|
||||||
/* wrap repartition query string around filter query string */
|
|
||||||
StringInfo mapQueryString = makeStringInfo();
|
|
||||||
char *filterQueryString = filterTask->queryString;
|
|
||||||
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
|
||||||
|
|
||||||
PartitionType partitionType = mapMergeJob->partitionType;
|
|
||||||
if (partitionType == RANGE_PARTITION_TYPE)
|
|
||||||
{
|
|
||||||
ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray;
|
|
||||||
uint32 intervalCount = mapMergeJob->partitionCount;
|
|
||||||
|
|
||||||
ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount);
|
|
||||||
StringInfo splitPointString = SplitPointArrayString(splitPointObject,
|
|
||||||
partitionColumnType,
|
|
||||||
partitionColumnTypeMod);
|
|
||||||
|
|
||||||
appendStringInfo(mapQueryString, RANGE_PARTITION_COMMAND, jobId, taskId,
|
|
||||||
filterQueryEscapedText, partitionColumnName,
|
|
||||||
partitionColumnTypeFullName, splitPointString->data);
|
|
||||||
}
|
|
||||||
else if (partitionType == SINGLE_HASH_PARTITION_TYPE)
|
|
||||||
{
|
|
||||||
ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray;
|
|
||||||
uint32 intervalCount = mapMergeJob->partitionCount;
|
|
||||||
|
|
||||||
ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount);
|
|
||||||
StringInfo splitPointString = SplitPointArrayString(splitPointObject,
|
|
||||||
partitionColumnType,
|
|
||||||
partitionColumnTypeMod);
|
|
||||||
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
|
|
||||||
filterQueryEscapedText, partitionColumnName,
|
|
||||||
partitionColumnTypeFullName, splitPointString->data);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
uint32 partitionCount = mapMergeJob->partitionCount;
|
|
||||||
ShardInterval **intervalArray =
|
|
||||||
GenerateSyntheticShardIntervalArray(partitionCount);
|
|
||||||
ArrayType *splitPointObject = SplitPointObject(intervalArray,
|
|
||||||
mapMergeJob->partitionCount);
|
|
||||||
StringInfo splitPointString =
|
|
||||||
SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID));
|
|
||||||
|
|
||||||
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
|
|
||||||
filterQueryEscapedText, partitionColumnName,
|
|
||||||
partitionColumnTypeFullName, splitPointString->data);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* convert filter query task into map task */
|
/* convert filter query task into map task */
|
||||||
Task *mapTask = filterTask;
|
Task *mapTask = filterTask;
|
||||||
|
@ -4257,6 +4208,60 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateMapQueryString creates and returns the map query string for the given filterTask.
|
||||||
|
*/
|
||||||
|
static StringInfo
|
||||||
|
CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
|
||||||
|
char *partitionColumnName)
|
||||||
|
{
|
||||||
|
uint64 jobId = filterTask->jobId;
|
||||||
|
uint32 taskId = filterTask->taskId;
|
||||||
|
|
||||||
|
/* wrap repartition query string around filter query string */
|
||||||
|
StringInfo mapQueryString = makeStringInfo();
|
||||||
|
char *filterQueryString = filterTask->queryString;
|
||||||
|
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
|
||||||
|
PartitionType partitionType = mapMergeJob->partitionType;
|
||||||
|
|
||||||
|
Var *partitionColumn = mapMergeJob->partitionColumn;
|
||||||
|
Oid partitionColumnType = partitionColumn->vartype;
|
||||||
|
char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType);
|
||||||
|
int32 partitionColumnTypeMod = partitionColumn->vartypmod;
|
||||||
|
|
||||||
|
ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray;
|
||||||
|
uint32 intervalCount = mapMergeJob->partitionCount;
|
||||||
|
|
||||||
|
if (partitionType != SINGLE_HASH_PARTITION_TYPE && partitionType !=
|
||||||
|
RANGE_PARTITION_TYPE)
|
||||||
|
{
|
||||||
|
partitionColumnType = INT4OID;
|
||||||
|
partitionColumnTypeMod = get_typmodin(INT4OID);
|
||||||
|
intervalArray = GenerateSyntheticShardIntervalArray(intervalCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount);
|
||||||
|
StringInfo splitPointString = SplitPointArrayString(splitPointObject,
|
||||||
|
partitionColumnType,
|
||||||
|
partitionColumnTypeMod);
|
||||||
|
|
||||||
|
char *partitionCommand = NULL;
|
||||||
|
if (partitionType == RANGE_PARTITION_TYPE)
|
||||||
|
{
|
||||||
|
partitionCommand = RANGE_PARTITION_COMMAND;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
partitionCommand = HASH_PARTITION_COMMAND;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(mapQueryString, partitionCommand, jobId, taskId,
|
||||||
|
filterQueryEscapedText, partitionColumnName,
|
||||||
|
partitionColumnTypeFullName, splitPointString->data);
|
||||||
|
return mapQueryString;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateSyntheticShardIntervalArray returns a shard interval pointer array
|
* GenerateSyntheticShardIntervalArray returns a shard interval pointer array
|
||||||
* which has a uniform hash distribution for the given input partitionCount.
|
* which has a uniform hash distribution for the given input partitionCount.
|
||||||
|
|
Loading…
Reference in New Issue