Add LocalPlannedStatement struct

pull/3388/head
Onder Kalaci 2020-01-13 10:47:20 +01:00 committed by Onder Kalaci
parent 4b5241c7b2
commit ff12df411b
7 changed files with 61 additions and 0 deletions

View File

@ -85,6 +85,7 @@ copyJobInfo(Job *newnode, Job *from)
COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(requiresMasterEvaluation);
COPY_SCALAR_FIELD(deferredPruning); COPY_SCALAR_FIELD(deferredPruning);
COPY_NODE_FIELD(partitionKeyValue); COPY_NODE_FIELD(partitionKeyValue);
COPY_NODE_FIELD(localPlannedStatements);
} }
@ -266,6 +267,16 @@ CopyNodeTask(COPYFUNC_ARGS)
} }
void
CopyNodeLocalPlannedStatement(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(LocalPlannedStatement);
COPY_SCALAR_FIELD(shardId);
COPY_NODE_FIELD(localPlan);
}
void void
CopyNodeTaskExecution(COPYFUNC_ARGS) CopyNodeTaskExecution(COPYFUNC_ARGS)
{ {

View File

@ -36,6 +36,7 @@ static const char *CitusNodeTagNamesD[] = {
"DistributedPlan", "DistributedPlan",
"DistributedSubPlan", "DistributedSubPlan",
"Task", "Task",
"LocalPlannedStatement",
"TaskExecution", "TaskExecution",
"ShardInterval", "ShardInterval",
"ShardPlacement", "ShardPlacement",
@ -388,6 +389,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(RelationRowLock), DEFINE_NODE_METHODS(RelationRowLock),
DEFINE_NODE_METHODS(Task), DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(LocalPlannedStatement),
DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(TaskExecution),
DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement), DEFINE_NODE_METHODS(GroupShardPlacement),

View File

@ -327,6 +327,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(requiresMasterEvaluation);
WRITE_BOOL_FIELD(deferredPruning); WRITE_BOOL_FIELD(deferredPruning);
WRITE_NODE_FIELD(partitionKeyValue); WRITE_NODE_FIELD(partitionKeyValue);
WRITE_NODE_FIELD(localPlannedStatements);
} }
@ -483,6 +484,18 @@ OutTask(OUTFUNC_ARGS)
} }
void
OutLocalPlannedStatement(OUTFUNC_ARGS)
{
WRITE_LOCALS(LocalPlannedStatement);
WRITE_NODE_TYPE("LocalPlannedStatement");
WRITE_UINT64_FIELD(shardId);
WRITE_NODE_FIELD(localPlan);
}
void void
OutTaskExecution(OUTFUNC_ARGS) OutTaskExecution(OUTFUNC_ARGS)
{ {

View File

@ -189,6 +189,7 @@ readJobInfo(Job *local_node)
READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(requiresMasterEvaluation);
READ_BOOL_FIELD(deferredPruning); READ_BOOL_FIELD(deferredPruning);
READ_NODE_FIELD(partitionKeyValue); READ_NODE_FIELD(partitionKeyValue);
READ_NODE_FIELD(localPlannedStatements);
} }
@ -399,6 +400,18 @@ ReadTask(READFUNC_ARGS)
} }
READFUNC_RET
ReadLocalPlannedStatement(READFUNC_ARGS)
{
READ_LOCALS(LocalPlannedStatement);
READ_UINT64_FIELD(shardId);
READ_NODE_FIELD(localPlan);
READ_DONE();
}
READFUNC_RET READFUNC_RET
ReadTaskExecution(READFUNC_ARGS) ReadTaskExecution(READFUNC_ARGS)
{ {

View File

@ -51,6 +51,7 @@ extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS); extern READFUNC_RET ReadRelationRowLock(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadLocalPlannedStatement(READFUNC_ARGS);
extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS); extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS);
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS);
@ -66,6 +67,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutRelationRowLock(OUTFUNC_ARGS); extern void OutRelationRowLock(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS);
extern void OutLocalPlannedStatement(OUTFUNC_ARGS);
extern void OutTaskExecution(OUTFUNC_ARGS); extern void OutTaskExecution(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutGroupShardPlacement(OUTFUNC_ARGS); extern void OutGroupShardPlacement(OUTFUNC_ARGS);
@ -91,6 +93,7 @@ extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS);
extern void CopyNodeRelationShard(COPYFUNC_ARGS); extern void CopyNodeRelationShard(COPYFUNC_ARGS);
extern void CopyNodeRelationRowLock(COPYFUNC_ARGS); extern void CopyNodeRelationRowLock(COPYFUNC_ARGS);
extern void CopyNodeTask(COPYFUNC_ARGS); extern void CopyNodeTask(COPYFUNC_ARGS);
extern void CopyNodeLocalPlannedStatement(COPYFUNC_ARGS);
extern void CopyNodeTaskExecution(COPYFUNC_ARGS); extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);

View File

@ -58,6 +58,7 @@ typedef enum CitusNodeTag
T_DistributedPlan, T_DistributedPlan,
T_DistributedSubPlan, T_DistributedSubPlan,
T_Task, T_Task,
T_LocalPlannedStatement,
T_TaskExecution, T_TaskExecution,
T_ShardInterval, T_ShardInterval,
T_ShardPlacement, T_ShardPlacement,

View File

@ -117,6 +117,20 @@ typedef enum RowModifyLevel
ROW_MODIFY_NONCOMMUTATIVE = 3 ROW_MODIFY_NONCOMMUTATIVE = 3
} RowModifyLevel; } RowModifyLevel;
/*
* LocalPlannedStatement represents a local plan of a shard. The scope
* for the LocalPlannedStatement is Task.
*/
typedef struct LocalPlannedStatement
{
CitusNode type;
uint64 shardId;
PlannedStmt *localPlan;
} LocalPlannedStatement;
/* /*
* Job represents a logical unit of work that contains one set of data transfers * Job represents a logical unit of work that contains one set of data transfers
* in our physical plan. The physical planner maps each SQL query into one or * in our physical plan. The physical planner maps each SQL query into one or
@ -135,6 +149,9 @@ typedef struct Job
bool requiresMasterEvaluation; /* only applies to modify jobs */ bool requiresMasterEvaluation; /* only applies to modify jobs */
bool deferredPruning; bool deferredPruning;
Const *partitionKeyValue; Const *partitionKeyValue;
/* for local shard queries, we may save the local plan here */
List *localPlannedStatements;
} Job; } Job;
@ -153,6 +170,7 @@ typedef struct MapMergeJob
} MapMergeJob; } MapMergeJob;
/* /*
* Task represents an executable unit of work. We conceptualize our tasks into * Task represents an executable unit of work. We conceptualize our tasks into
* compute and data fetch task types. SQL, map, and merge tasks are considered * compute and data fetch task types. SQL, map, and merge tasks are considered