Add DistributedSubPlan node

pull/1853/head
Marco Slot 2017-12-04 18:44:43 +01:00
parent b5784ca03a
commit 7d1191954d
7 changed files with 71 additions and 0 deletions

View File

@ -101,6 +101,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
{ {
DECLARE_FROM_AND_NEW_NODE(DistributedPlan); DECLARE_FROM_AND_NEW_NODE(DistributedPlan);
COPY_SCALAR_FIELD(planId);
COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(operation);
COPY_SCALAR_FIELD(hasReturning); COPY_SCALAR_FIELD(hasReturning);
@ -112,10 +113,23 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(insertSelectSubquery); COPY_NODE_FIELD(insertSelectSubquery);
COPY_NODE_FIELD(insertTargetList); COPY_NODE_FIELD(insertTargetList);
COPY_SCALAR_FIELD(targetRelationId); COPY_SCALAR_FIELD(targetRelationId);
COPY_NODE_FIELD(subPlanList);
COPY_NODE_FIELD(planningError); COPY_NODE_FIELD(planningError);
} }
void
CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(DistributedSubPlan);
COPY_SCALAR_FIELD(subPlanId);
COPY_NODE_FIELD(plan);
}
void void
CopyNodeShardInterval(COPYFUNC_ARGS) CopyNodeShardInterval(COPYFUNC_ARGS)
{ {

View File

@ -32,6 +32,7 @@ static const char *CitusNodeTagNamesD[] = {
"Job", "Job",
"MapMergeJob", "MapMergeJob",
"DistributedPlan", "DistributedPlan",
"DistributedSubPlan",
"Task", "Task",
"TaskExecution", "TaskExecution",
"ShardInterval", "ShardInterval",
@ -384,6 +385,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
const ExtensibleNodeMethods nodeMethods[] = const ExtensibleNodeMethods nodeMethods[] =
{ {
DEFINE_NODE_METHODS(DistributedPlan), DEFINE_NODE_METHODS(DistributedPlan),
DEFINE_NODE_METHODS(DistributedSubPlan),
DEFINE_NODE_METHODS(Job), DEFINE_NODE_METHODS(Job),
DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(ShardInterval),
DEFINE_NODE_METHODS(MapMergeJob), DEFINE_NODE_METHODS(MapMergeJob),

View File

@ -175,6 +175,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
WRITE_UINT64_FIELD(planId);
WRITE_INT_FIELD(operation); WRITE_INT_FIELD(operation);
WRITE_BOOL_FIELD(hasReturning); WRITE_BOOL_FIELD(hasReturning);
@ -187,10 +188,24 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(insertTargetList); WRITE_NODE_FIELD(insertTargetList);
WRITE_OID_FIELD(targetRelationId); WRITE_OID_FIELD(targetRelationId);
WRITE_NODE_FIELD(subPlanList);
WRITE_NODE_FIELD(planningError); WRITE_NODE_FIELD(planningError);
} }
void
OutDistributedSubPlan(OUTFUNC_ARGS)
{
WRITE_LOCALS(DistributedSubPlan);
WRITE_NODE_TYPE("DISTRIBUTEDSUBPLAN");
WRITE_UINT_FIELD(subPlanId);
WRITE_NODE_FIELD(plan);
}
void void
OutMultiProject(OUTFUNC_ARGS) OutMultiProject(OUTFUNC_ARGS)
{ {

View File

@ -199,6 +199,7 @@ ReadDistributedPlan(READFUNC_ARGS)
{ {
READ_LOCALS(DistributedPlan); READ_LOCALS(DistributedPlan);
READ_UINT64_FIELD(planId);
READ_INT_FIELD(operation); READ_INT_FIELD(operation);
READ_BOOL_FIELD(hasReturning); READ_BOOL_FIELD(hasReturning);
@ -211,12 +212,26 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_NODE_FIELD(insertTargetList); READ_NODE_FIELD(insertTargetList);
READ_OID_FIELD(targetRelationId); READ_OID_FIELD(targetRelationId);
READ_NODE_FIELD(subPlanList);
READ_NODE_FIELD(planningError); READ_NODE_FIELD(planningError);
READ_DONE(); READ_DONE();
} }
READFUNC_RET
ReadDistributedSubPlan(READFUNC_ARGS)
{
READ_LOCALS(DistributedSubPlan);
READ_UINT_FIELD(subPlanId);
READ_NODE_FIELD(plan);
READ_DONE();
}
READFUNC_RET READFUNC_RET
ReadShardInterval(READFUNC_ARGS) ReadShardInterval(READFUNC_ARGS)
{ {

View File

@ -44,6 +44,7 @@ extern void RegisterNodes(void);
extern READFUNC_RET ReadJob(READFUNC_ARGS); extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS); extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS);
extern READFUNC_RET ReadDistributedSubPlan(READFUNC_ARGS);
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
@ -57,6 +58,7 @@ extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
extern void OutJob(OUTFUNC_ARGS); extern void OutJob(OUTFUNC_ARGS);
extern void OutDistributedPlan(OUTFUNC_ARGS); extern void OutDistributedPlan(OUTFUNC_ARGS);
extern void OutDistributedSubPlan(OUTFUNC_ARGS);
extern void OutShardInterval(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS);
extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS);
@ -79,6 +81,7 @@ extern void OutMultiExtendedOp(OUTFUNC_ARGS);
extern void CopyNodeJob(COPYFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS);
extern void CopyNodeDistributedPlan(COPYFUNC_ARGS); extern void CopyNodeDistributedPlan(COPYFUNC_ARGS);
extern void CopyNodeDistributedSubPlan(COPYFUNC_ARGS);
extern void CopyNodeShardInterval(COPYFUNC_ARGS); extern void CopyNodeShardInterval(COPYFUNC_ARGS);
extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
extern void CopyNodeShardPlacement(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS);

View File

@ -56,6 +56,7 @@ typedef enum CitusNodeTag
T_Job, T_Job,
T_MapMergeJob, T_MapMergeJob,
T_DistributedPlan, T_DistributedPlan,
T_DistributedSubPlan,
T_Task, T_Task,
T_TaskExecution, T_TaskExecution,
T_ShardInterval, T_ShardInterval,

View File

@ -224,6 +224,9 @@ typedef struct DistributedPlan
{ {
CitusNode type; CitusNode type;
/* unique identifier of the plan within the session */
uint64 planId;
/* type of command to execute (SELECT/INSERT/...) */ /* type of command to execute (SELECT/INSERT/...) */
CmdType operation; CmdType operation;
@ -251,6 +254,9 @@ typedef struct DistributedPlan
/* target relation of an INSERT ... SELECT via the coordinator */ /* target relation of an INSERT ... SELECT via the coordinator */
Oid targetRelationId; Oid targetRelationId;
/* list of subplans to execute before the distributed query */
List *subPlanList;
/* /*
* NULL if this a valid plan, an error description otherwise. This will * NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support, * e.g. be set if SQL features are present that a planner doesn't support,
@ -260,6 +266,21 @@ typedef struct DistributedPlan
} DistributedPlan; } DistributedPlan;
/*
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are
* executed before the distributed query and their results are written to
* temporary files. This is used to execute CTEs and subquery joins that
* cannot be distributed.
*/
typedef struct DistributedSubPlan
{
CitusNode type;
uint32 subPlanId;
PlannedStmt *plan;
} DistributedSubPlan;
/* OperatorCacheEntry contains information for each element in OperatorCache */ /* OperatorCacheEntry contains information for each element in OperatorCache */
typedef struct OperatorCacheEntry typedef struct OperatorCacheEntry
{ {