diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index a087f14b6..24737f6b2 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -85,6 +85,7 @@ copyJobInfo(Job *newnode, Job *from) COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(deferredPruning); COPY_NODE_FIELD(partitionKeyValue); + COPY_NODE_FIELD(localPlannedStatements); } @@ -266,6 +267,16 @@ CopyNodeTask(COPYFUNC_ARGS) } +void +CopyNodeLocalPlannedStatement(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(LocalPlannedStatement); + + COPY_SCALAR_FIELD(shardId); + COPY_NODE_FIELD(localPlan); +} + + void CopyNodeTaskExecution(COPYFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index e86fcff9f..1209874a2 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -36,6 +36,7 @@ static const char *CitusNodeTagNamesD[] = { "DistributedPlan", "DistributedSubPlan", "Task", + "LocalPlannedStatement", "TaskExecution", "ShardInterval", "ShardPlacement", @@ -388,6 +389,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(RelationRowLock), DEFINE_NODE_METHODS(Task), + DEFINE_NODE_METHODS(LocalPlannedStatement), DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 73bbc4bae..7459d9d95 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -327,6 +327,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(deferredPruning); WRITE_NODE_FIELD(partitionKeyValue); + WRITE_NODE_FIELD(localPlannedStatements); } @@ -483,6 +484,18 @@ OutTask(OUTFUNC_ARGS) } +void +OutLocalPlannedStatement(OUTFUNC_ARGS) +{ + WRITE_LOCALS(LocalPlannedStatement); + + WRITE_NODE_TYPE("LocalPlannedStatement"); + + WRITE_UINT64_FIELD(shardId); + WRITE_NODE_FIELD(localPlan); +} + + void OutTaskExecution(OUTFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 34065217f..395ce1bf4 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -189,6 +189,7 @@ readJobInfo(Job *local_node) READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(deferredPruning); READ_NODE_FIELD(partitionKeyValue); + READ_NODE_FIELD(localPlannedStatements); } @@ -399,6 +400,18 @@ ReadTask(READFUNC_ARGS) } +READFUNC_RET +ReadLocalPlannedStatement(READFUNC_ARGS) +{ + READ_LOCALS(LocalPlannedStatement); + + READ_UINT64_FIELD(shardId); + READ_NODE_FIELD(localPlan); + + READ_DONE(); +} + + READFUNC_RET ReadTaskExecution(READFUNC_ARGS) { diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 426abe4fb..e9507bfa8 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -51,6 +51,7 @@ extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); +extern READFUNC_RET ReadLocalPlannedStatement(READFUNC_ARGS); extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS); extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS); @@ -66,6 +67,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); +extern void OutLocalPlannedStatement(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutGroupShardPlacement(OUTFUNC_ARGS); @@ -91,6 +93,7 @@ extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS); extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS); +extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index b6f10d751..f1816d7c5 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -58,6 +58,7 @@ typedef enum CitusNodeTag T_DistributedPlan, T_DistributedSubPlan, T_Task, + T_LocalPlannedStatement, T_TaskExecution, T_ShardInterval, T_ShardPlacement, diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1b2fb894b..b7a409eac 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -117,6 +117,20 @@ typedef enum RowModifyLevel ROW_MODIFY_NONCOMMUTATIVE = 3 } RowModifyLevel; + +/* + * LocalPlannedStatement represents a local plan of a shard. The scope + * for the LocalPlannedStatement is Task. + */ +typedef struct LocalPlannedStatement +{ + CitusNode type; + + uint64 shardId; + PlannedStmt *localPlan; +} LocalPlannedStatement; + + /* * Job represents a logical unit of work that contains one set of data transfers * in our physical plan. The physical planner maps each SQL query into one or @@ -135,6 +149,9 @@ typedef struct Job bool requiresMasterEvaluation; /* only applies to modify jobs */ bool deferredPruning; Const *partitionKeyValue; + + /* for local shard queries, we may save the local plan here */ + List *localPlannedStatements; } Job; @@ -153,6 +170,7 @@ typedef struct MapMergeJob } MapMergeJob; + /* * Task represents an executable unit of work. We conceptualize our tasks into * compute and data fetch task types. SQL, map, and merge tasks are considered