citus/src/backend/distributed/executor/subplan_execution.c

80 lines
2.2 KiB
C

/*-------------------------------------------------------------------------
*
* subplan_execution.c
*
* Functions for execution subplans prior to distributed table execution.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "executor/executor.h"
int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */
/* when this is true, we enforce intermediate result size limit in all executors */
int SubPlanLevel = 0;
/*
* ExecuteSubPlans executes a list of subplans from a distributed plan
* by sequentially executing each plan from the top.
*/
void
ExecuteSubPlans(DistributedPlan *distributedPlan)
{
uint64 planId = distributedPlan->planId;
List *subPlanList = distributedPlan->subPlanList;
ListCell *subPlanCell = NULL;
List *nodeList = NIL;
bool writeLocalFile = false;
if (subPlanList == NIL)
{
/* no subplans to execute */
return;
}
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results of subplans will be stored in a directory that is
* derived from the distributed transaction ID.
*/
BeginOrContinueCoordinatedTransaction();
nodeList = ActiveReadableNodeList();
foreach(subPlanCell, subPlanList)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
PlannedStmt *plannedStmt = subPlan->plan;
uint32 subPlanId = subPlan->subPlanId;
DestReceiver *copyDest = NULL;
ParamListInfo params = NULL;
EState *estate = NULL;
char *resultId = GenerateResultId(planId, subPlanId);
SubPlanLevel++;
estate = CreateExecutorState();
copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate,
nodeList,
writeLocalFile);
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
SubPlanLevel--;
FreeExecutorState(estate);
}
}