From 7d1191954d59b81bccb96f7b00c4426884db5fff Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 4 Dec 2017 18:44:43 +0100 Subject: [PATCH] Add DistributedSubPlan node --- .../distributed/utils/citus_copyfuncs.c | 14 +++++++++++++ .../distributed/utils/citus_nodefuncs.c | 2 ++ .../distributed/utils/citus_outfuncs.c | 15 +++++++++++++ .../distributed/utils/citus_readfuncs.c | 15 +++++++++++++ src/include/distributed/citus_nodefuncs.h | 3 +++ src/include/distributed/citus_nodes.h | 1 + .../distributed/multi_physical_planner.h | 21 +++++++++++++++++++ 7 files changed, 71 insertions(+) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index d2ea8c9fb..5ee49a8b7 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -101,6 +101,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) { DECLARE_FROM_AND_NEW_NODE(DistributedPlan); + COPY_SCALAR_FIELD(planId); COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(hasReturning); @@ -112,10 +113,23 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(insertSelectSubquery); COPY_NODE_FIELD(insertTargetList); COPY_SCALAR_FIELD(targetRelationId); + + COPY_NODE_FIELD(subPlanList); + COPY_NODE_FIELD(planningError); } +void +CopyNodeDistributedSubPlan(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(DistributedSubPlan); + + COPY_SCALAR_FIELD(subPlanId); + COPY_NODE_FIELD(plan); +} + + void CopyNodeShardInterval(COPYFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index cfdd52846..dc6160b6f 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -32,6 +32,7 @@ static const char *CitusNodeTagNamesD[] = { "Job", "MapMergeJob", "DistributedPlan", + "DistributedSubPlan", "Task", "TaskExecution", "ShardInterval", @@ -384,6 +385,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a, const ExtensibleNodeMethods nodeMethods[] = { DEFINE_NODE_METHODS(DistributedPlan), + DEFINE_NODE_METHODS(DistributedSubPlan), DEFINE_NODE_METHODS(Job), DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(MapMergeJob), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 45ae1ea31..433007770 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -175,6 +175,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); + WRITE_UINT64_FIELD(planId); WRITE_INT_FIELD(operation); WRITE_BOOL_FIELD(hasReturning); @@ -187,10 +188,24 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(insertTargetList); WRITE_OID_FIELD(targetRelationId); + WRITE_NODE_FIELD(subPlanList); + WRITE_NODE_FIELD(planningError); } +void +OutDistributedSubPlan(OUTFUNC_ARGS) +{ + WRITE_LOCALS(DistributedSubPlan); + + WRITE_NODE_TYPE("DISTRIBUTEDSUBPLAN"); + + WRITE_UINT_FIELD(subPlanId); + WRITE_NODE_FIELD(plan); +} + + void OutMultiProject(OUTFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index f3f9adc5d..4e8ddb1ef 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -199,6 +199,7 @@ ReadDistributedPlan(READFUNC_ARGS) { READ_LOCALS(DistributedPlan); + READ_UINT64_FIELD(planId); READ_INT_FIELD(operation); READ_BOOL_FIELD(hasReturning); @@ -211,12 +212,26 @@ ReadDistributedPlan(READFUNC_ARGS) READ_NODE_FIELD(insertTargetList); READ_OID_FIELD(targetRelationId); + READ_NODE_FIELD(subPlanList); + READ_NODE_FIELD(planningError); READ_DONE(); } +READFUNC_RET +ReadDistributedSubPlan(READFUNC_ARGS) +{ + READ_LOCALS(DistributedSubPlan); + + READ_UINT_FIELD(subPlanId); + READ_NODE_FIELD(plan); + + READ_DONE(); +} + + READFUNC_RET ReadShardInterval(READFUNC_ARGS) { diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 451803e2b..969140abd 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -44,6 +44,7 @@ extern void RegisterNodes(void); extern READFUNC_RET ReadJob(READFUNC_ARGS); extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS); +extern READFUNC_RET ReadDistributedSubPlan(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); @@ -57,6 +58,7 @@ extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); extern void OutJob(OUTFUNC_ARGS); extern void OutDistributedPlan(OUTFUNC_ARGS); +extern void OutDistributedSubPlan(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); @@ -79,6 +81,7 @@ extern void OutMultiExtendedOp(OUTFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS); extern void CopyNodeDistributedPlan(COPYFUNC_ARGS); +extern void CopyNodeDistributedSubPlan(COPYFUNC_ARGS); extern void CopyNodeShardInterval(COPYFUNC_ARGS); extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 4e057290a..b25082472 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -56,6 +56,7 @@ typedef enum CitusNodeTag T_Job, T_MapMergeJob, T_DistributedPlan, + T_DistributedSubPlan, T_Task, T_TaskExecution, T_ShardInterval, diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ee183c3f9..cacd80d05 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -224,6 +224,9 @@ typedef struct DistributedPlan { CitusNode type; + /* unique identifier of the plan within the session */ + uint64 planId; + /* type of command to execute (SELECT/INSERT/...) */ CmdType operation; @@ -251,6 +254,9 @@ typedef struct DistributedPlan /* target relation of an INSERT ... SELECT via the coordinator */ Oid targetRelationId; + /* list of subplans to execute before the distributed query */ + List *subPlanList; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, @@ -260,6 +266,21 @@ typedef struct DistributedPlan } DistributedPlan; +/* + * DistributedSubPlan contains a subplan of a distributed plan. Subplans are + * executed before the distributed query and their results are written to + * temporary files. This is used to execute CTEs and subquery joins that + * cannot be distributed. + */ +typedef struct DistributedSubPlan +{ + CitusNode type; + + uint32 subPlanId; + PlannedStmt *plan; +} DistributedSubPlan; + + /* OperatorCacheEntry contains information for each element in OperatorCache */ typedef struct OperatorCacheEntry {