Remove unused reduceQuery from physical planning (#6221)

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6225/head^2
Marco Slot 2022-08-24 19:24:27 +02:00 committed by GitHub
parent 1f4fe35512
commit ac07d33a29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 7 additions and 16 deletions

View File

@ -490,6 +490,10 @@ RangePartitionJoinBaseRelationId(MultiJoin *joinNode)
{ {
partitionNode = (MultiPartition *) rightChildNode; partitionNode = (MultiPartition *) rightChildNode;
} }
else
{
Assert(false);
}
Index baseTableId = partitionNode->splitPointTableId; Index baseTableId = partitionNode->splitPointTableId;
MultiTable *baseTable = FindTableNode((MultiNode *) joinNode, baseTableId); MultiTable *baseTable = FindTableNode((MultiNode *) joinNode, baseTableId);
@ -575,12 +579,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
Job *job = (Job *) linitial(dependentJobList); Job *job = (Job *) linitial(dependentJobList);
if (CitusIsA(job, MapMergeJob)) if (CitusIsA(job, MapMergeJob))
{ {
MapMergeJob *mapMergeJob = (MapMergeJob *) job;
isRepartitionJoin = true; isRepartitionJoin = true;
if (mapMergeJob->reduceQuery)
{
updateColumnAttributes = false;
}
} }
} }
@ -4671,18 +4670,13 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
for (uint32 partitionId = initialPartitionId; partitionId < partitionCount; for (uint32 partitionId = initialPartitionId; partitionId < partitionCount;
partitionId++) partitionId++)
{ {
Task *mergeTask = NULL;
List *mapOutputFetchTaskList = NIL; List *mapOutputFetchTaskList = NIL;
ListCell *mapTaskCell = NULL; ListCell *mapTaskCell = NULL;
uint32 mergeTaskId = taskIdIndex; uint32 mergeTaskId = taskIdIndex;
Query *reduceQuery = mapMergeJob->reduceQuery; /* create logical merge task (not executed, but useful for bookkeeping) */
if (reduceQuery == NULL) Task *mergeTask = CreateBasicTask(jobId, mergeTaskId, MERGE_TASK,
{ "<merge>");
/* create logical merge task (not executed, but useful for bookkeeping) */
mergeTask = CreateBasicTask(jobId, mergeTaskId, MERGE_TASK,
"<merge>");
}
mergeTask->partitionId = partitionId; mergeTask->partitionId = partitionId;
taskIdIndex++; taskIdIndex++;

View File

@ -198,7 +198,6 @@ CopyNodeMapMergeJob(COPYFUNC_ARGS)
copyJobInfo(&newnode->job, &from->job); copyJobInfo(&newnode->job, &from->job);
COPY_NODE_FIELD(reduceQuery);
COPY_SCALAR_FIELD(partitionType); COPY_SCALAR_FIELD(partitionType);
COPY_NODE_FIELD(partitionColumn); COPY_NODE_FIELD(partitionColumn);
COPY_SCALAR_FIELD(partitionCount); COPY_SCALAR_FIELD(partitionCount);

View File

@ -401,7 +401,6 @@ OutMapMergeJob(OUTFUNC_ARGS)
WRITE_NODE_TYPE("MAPMERGEJOB"); WRITE_NODE_TYPE("MAPMERGEJOB");
OutJobFields(str, (Job *) node); OutJobFields(str, (Job *) node);
WRITE_NODE_FIELD(reduceQuery);
WRITE_ENUM_FIELD(partitionType, PartitionType); WRITE_ENUM_FIELD(partitionType, PartitionType);
WRITE_NODE_FIELD(partitionColumn); WRITE_NODE_FIELD(partitionColumn);
WRITE_UINT_FIELD(partitionCount); WRITE_UINT_FIELD(partitionCount);

View File

@ -160,7 +160,6 @@ typedef struct Job
typedef struct MapMergeJob typedef struct MapMergeJob
{ {
Job job; Job job;
Query *reduceQuery;
PartitionType partitionType; PartitionType partitionType;
Var *partitionColumn; Var *partitionColumn;
uint32 partitionCount; uint32 partitionCount;