From fa18899cf96028b478f4a5cf0248c118007fb83e Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 2 Aug 2017 08:24:00 +0300 Subject: [PATCH] Remove serialization/deserialization of multiplan node (#1477) introduces copy functions for Citus MultiPlan nodes. uses ExtensibleNode mechanism to store MultiPlan data drops serialiazation of MultiPlans --- .../executor/multi_server_executor.c | 5 +- .../executor/multi_task_tracker_executor.c | 1 + .../distributed/planner/multi_planner.c | 88 +++--- .../distributed/utils/citus_copyfuncs.c | 272 ++++++++++++++++++ .../distributed/utils/citus_nodefuncs.c | 5 +- .../distributed/utils/citus_outfuncs.c | 50 ++++ .../distributed/utils/citus_readfuncs.c | 33 +++ src/include/distributed/citus_nodefuncs.h | 15 + src/include/distributed/citus_nodes.h | 1 + .../distributed/multi_server_executor.h | 1 + 10 files changed, 417 insertions(+), 54 deletions(-) create mode 100644 src/backend/distributed/utils/citus_copyfuncs.c diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 79249f54a..a7b2c42a0 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -170,7 +170,8 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus) uint32 nodeCount = list_length(task->taskPlacementList); uint32 nodeIndex = 0; - TaskExecution *taskExecution = palloc0(sizeof(TaskExecution)); + TaskExecution *taskExecution = CitusMakeNode(TaskExecution); + taskExecution->jobId = task->jobId; taskExecution->taskId = task->taskId; taskExecution->nodeCount = nodeCount; @@ -235,7 +236,7 @@ CleanupTaskExecution(TaskExecution *taskExecution) pfree(taskExecution->taskStatusArray); pfree(taskExecution->connectionIdArray); pfree(taskExecution->fileDescriptorArray); - memset(taskExecution, 0, sizeof(TaskExecution)); + pfree(taskExecution); } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 0a8b4e3d1..00fcf8f3e 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2711,6 +2711,7 @@ TrackerCleanupResources(HTAB *taskTrackerHash, HTAB *transmitTrackerHash, TaskExecution *taskExecution = task->taskExecution; CleanupTaskExecution(taskExecution); + task->taskExecution = NULL; } /* diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 81d1a547f..1f61dc2ef 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -70,13 +70,12 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin plannerRestrictionContext); static void AssignRTEIdentities(Query *queryTree); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); -static Node * SerializeMultiPlan(struct MultiPlan *multiPlan); -static MultiPlan * DeserializeMultiPlan(Node *node); static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static void CheckNodeIsDumpable(Node *node); +static Node * CheckNodeCopyAndSerialization(Node *node); static List * CopyPlanParamList(List *originalPlanParamList); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); @@ -412,60 +411,17 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query MultiPlan * GetMultiPlan(CustomScan *customScan) { + Node *node = NULL; MultiPlan *multiPlan = NULL; Assert(list_length(customScan->custom_private) == 1); - multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private)); + node = (Node *) linitial(customScan->custom_private); + Assert(CitusIsA(node, MultiPlan)); - return multiPlan; -} + node = CheckNodeCopyAndSerialization(node); - -/* - * SerializeMultiPlan returns the string representing the distributed plan in a - * Const node. - * - * Note that this should be improved for 9.6+, we we can copy trees efficiently. - * I.e. we should introduce copy support for relevant node types, and just - * return the MultiPlan as-is for 9.6. - */ -static Node * -SerializeMultiPlan(MultiPlan *multiPlan) -{ - char *serializedMultiPlan = NULL; - Const *multiPlanData = NULL; - - serializedMultiPlan = nodeToString(multiPlan); - - multiPlanData = makeNode(Const); - multiPlanData->consttype = CSTRINGOID; - multiPlanData->constlen = strlen(serializedMultiPlan); - multiPlanData->constvalue = CStringGetDatum(serializedMultiPlan); - multiPlanData->constbyval = false; - multiPlanData->location = -1; - - return (Node *) multiPlanData; -} - - -/* - * DeserializeMultiPlan returns the deserialized distributed plan from the string - * representation in a Const node. - */ -static MultiPlan * -DeserializeMultiPlan(Node *node) -{ - Const *multiPlanData = NULL; - char *serializedMultiPlan = NULL; - MultiPlan *multiPlan = NULL; - - Assert(IsA(node, Const)); - multiPlanData = (Const *) node; - serializedMultiPlan = DatumGetCString(multiPlanData->constvalue); - - multiPlan = (MultiPlan *) stringToNode(serializedMultiPlan); - Assert(CitusIsA(multiPlan, MultiPlan)); + multiPlan = (MultiPlan *) node; return multiPlan; } @@ -521,7 +477,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } - multiPlanData = SerializeMultiPlan(multiPlan); + multiPlanData = (Node *) multiPlan; customScan->custom_private = list_make1(multiPlanData); customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; @@ -666,6 +622,36 @@ CheckNodeIsDumpable(Node *node) } +/* + * CheckNodeCopyAndSerialization checks copy/dump/read functions + * for nodes and returns copy of the input. + * + * It is only active when assertions are enabled, otherwise it returns + * the input directly. We use this to confirm that our serialization + * and copy logic produces the correct plan during regression tests. + * + * It does not check string equality on node dumps due to differences + * in some Postgres types. + */ +static Node * +CheckNodeCopyAndSerialization(Node *node) +{ +#ifdef USE_ASSERT_CHECKING + char *out = nodeToString(node); + Node *deserializedNode = (Node *) stringToNode(out); + Node *nodeCopy = copyObject(deserializedNode); + char *outCopy = nodeToString(nodeCopy); + + pfree(out); + pfree(outCopy); + + return nodeCopy; +#else + return node; +#endif +} + + /* * multi_join_restriction_hook is a hook called by postgresql standard planner * to notify us about various planning information regarding joins. We use diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c new file mode 100644 index 000000000..b93f6b434 --- /dev/null +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -0,0 +1,272 @@ +/*------------------------------------------------------------------------- + * + * citus_copyfuncs.c + * Citus specific node copy functions + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2012-2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + + +#include "distributed/citus_nodefuncs.h" +#include "distributed/multi_server_executor.h" +#include "utils/datum.h" + + +/* + * Macros to simplify copying of different kinds of fields. Use these + * wherever possible to reduce the chance for silly typos. Note that these + * hard-wire the convention that the local variables in a Copy routine are + * named 'newnode' and 'from'. + */ +static inline Node * +CitusSetTag(Node *node, int tag) +{ + CitusNode *citus_node = (CitusNode *) node; + citus_node->citus_tag = tag; + return node; +} + + +#define DECLARE_FROM_AND_NEW_NODE(nodeTypeName) \ + nodeTypeName * newnode = (nodeTypeName *) \ + CitusSetTag((Node *) target_node, T_ ## nodeTypeName); \ + nodeTypeName *from = (nodeTypeName *) source_node + +/* Copy a simple scalar field (int, float, bool, enum, etc) */ +#define COPY_SCALAR_FIELD(fldname) \ + (newnode->fldname = from->fldname) + +/* Copy a field that is a pointer to some kind of Node or Node tree */ +#define COPY_NODE_FIELD(fldname) \ + (newnode->fldname = copyObject(from->fldname)) + +/* Copy a field that is a pointer to a C string, or perhaps NULL */ +#define COPY_STRING_FIELD(fldname) \ + (newnode->fldname = from->fldname ? pstrdup(from->fldname) : (char *) NULL) + +/* Copy a node array. Target array is also allocated. */ +#define COPY_NODE_ARRAY(fldname, type, count) \ + do { \ + int i = 0; \ + newnode->fldname = (type **) palloc(count * sizeof(type *)); \ + for (i = 0; i < count; ++i) \ + { \ + newnode->fldname[i] = copyObject(from->fldname[i]); \ + } \ + } \ + while (0) + +/* Copy a scalar array. Target array is also allocated. */ +#define COPY_SCALAR_ARRAY(fldname, type, count) \ + do { \ + int i = 0; \ + newnode->fldname = (type *) palloc(count * sizeof(type)); \ + for (i = 0; i < count; ++i) \ + { \ + newnode->fldname[i] = from->fldname[i]; \ + } \ + } \ + while (0) + + +static void +copyJobInfo(Job *newnode, Job *from) +{ + COPY_SCALAR_FIELD(jobId); + COPY_NODE_FIELD(jobQuery); + COPY_NODE_FIELD(taskList); + COPY_NODE_FIELD(dependedJobList); + COPY_SCALAR_FIELD(subqueryPushdown); + COPY_SCALAR_FIELD(requiresMasterEvaluation); + COPY_SCALAR_FIELD(deferredPruning); +} + + +void +CopyNodeJob(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(Job); + + copyJobInfo(newnode, from); +} + + +void +CopyNodeMultiPlan(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(MultiPlan); + + COPY_SCALAR_FIELD(operation); + COPY_SCALAR_FIELD(hasReturning); + + COPY_NODE_FIELD(workerJob); + COPY_NODE_FIELD(masterQuery); + COPY_SCALAR_FIELD(routerExecutable); + + COPY_NODE_FIELD(insertSelectSubquery); + COPY_NODE_FIELD(insertTargetList); + COPY_SCALAR_FIELD(targetRelationId); + COPY_NODE_FIELD(planningError); +} + + +void +CopyNodeShardInterval(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(ShardInterval); + + COPY_SCALAR_FIELD(relationId); + COPY_SCALAR_FIELD(storageType); + COPY_SCALAR_FIELD(valueTypeId); + COPY_SCALAR_FIELD(valueTypeLen); + COPY_SCALAR_FIELD(valueByVal); + COPY_SCALAR_FIELD(minValueExists); + COPY_SCALAR_FIELD(maxValueExists); + + if (from->minValueExists) + { + newnode->minValue = datumCopy(from->minValue, + from->valueByVal, + from->valueTypeLen); + } + if (from->maxValueExists) + { + newnode->maxValue = datumCopy(from->maxValue, + from->valueByVal, + from->valueTypeLen); + } + + COPY_SCALAR_FIELD(shardId); +} + + +void +CopyNodeMapMergeJob(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(MapMergeJob); + int arrayLength = 0; + + copyJobInfo(&newnode->job, &from->job); + + COPY_NODE_FIELD(reduceQuery); + COPY_SCALAR_FIELD(partitionType); + COPY_NODE_FIELD(partitionColumn); + COPY_SCALAR_FIELD(partitionCount); + COPY_SCALAR_FIELD(sortedShardIntervalArrayLength); + + arrayLength = from->sortedShardIntervalArrayLength; + + /* now build & read sortedShardIntervalArray */ + COPY_NODE_ARRAY(sortedShardIntervalArray, ShardInterval, arrayLength); + + COPY_NODE_FIELD(mapTaskList); + COPY_NODE_FIELD(mergeTaskList); +} + + +void +CopyNodeShardPlacement(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(ShardPlacement); + + COPY_SCALAR_FIELD(placementId); + COPY_SCALAR_FIELD(shardId); + COPY_SCALAR_FIELD(shardLength); + COPY_SCALAR_FIELD(shardState); + COPY_SCALAR_FIELD(groupId); + COPY_STRING_FIELD(nodeName); + COPY_SCALAR_FIELD(nodePort); + COPY_SCALAR_FIELD(partitionMethod); + COPY_SCALAR_FIELD(colocationGroupId); + COPY_SCALAR_FIELD(representativeValue); +} + + +void +CopyNodeGroupShardPlacement(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(GroupShardPlacement); + + COPY_SCALAR_FIELD(placementId); + COPY_SCALAR_FIELD(shardId); + COPY_SCALAR_FIELD(shardLength); + COPY_SCALAR_FIELD(shardState); + COPY_SCALAR_FIELD(groupId); +} + + +void +CopyNodeRelationShard(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(RelationShard); + + COPY_SCALAR_FIELD(relationId); + COPY_SCALAR_FIELD(shardId); +} + + +void +CopyNodeTask(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(Task); + + COPY_SCALAR_FIELD(taskType); + COPY_SCALAR_FIELD(jobId); + COPY_SCALAR_FIELD(taskId); + COPY_STRING_FIELD(queryString); + COPY_SCALAR_FIELD(anchorShardId); + COPY_NODE_FIELD(taskPlacementList); + COPY_NODE_FIELD(dependedTaskList); + COPY_SCALAR_FIELD(partitionId); + COPY_SCALAR_FIELD(upstreamTaskId); + COPY_NODE_FIELD(shardInterval); + COPY_SCALAR_FIELD(assignmentConstrained); + COPY_SCALAR_FIELD(shardId); + COPY_NODE_FIELD(taskExecution); + COPY_SCALAR_FIELD(upsertQuery); + COPY_SCALAR_FIELD(replicationModel); + COPY_SCALAR_FIELD(insertSelectQuery); + COPY_NODE_FIELD(relationShardList); +} + + +void +CopyNodeTaskExecution(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(TaskExecution); + + COPY_SCALAR_FIELD(jobId); + COPY_SCALAR_FIELD(taskId); + COPY_SCALAR_FIELD(nodeCount); + + COPY_SCALAR_ARRAY(taskStatusArray, TaskExecStatus, from->nodeCount); + COPY_SCALAR_ARRAY(transmitStatusArray, TransmitExecStatus, from->nodeCount); + COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount); + COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount); + + COPY_SCALAR_FIELD(connectStartTime); + COPY_SCALAR_FIELD(currentNodeIndex); + COPY_SCALAR_FIELD(querySourceNodeIndex); + COPY_SCALAR_FIELD(dataFetchTaskIndex); + COPY_SCALAR_FIELD(failureCount); +} + + +void +CopyNodeDeferredErrorMessage(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(DeferredErrorMessage); + + COPY_SCALAR_FIELD(code); + COPY_STRING_FIELD(message); + COPY_STRING_FIELD(detail); + COPY_STRING_FIELD(hint); + COPY_STRING_FIELD(filename); + COPY_SCALAR_FIELD(linenumber); + COPY_STRING_FIELD(functionname); +} diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 19820a0bd..9b6491504 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -16,6 +16,7 @@ #include "distributed/errormessage.h" #include "distributed/metadata_cache.h" #include "distributed/multi_planner.h" +#include "distributed/multi_server_executor.h" static const char *CitusNodeTagNamesD[] = { "MultiNode", @@ -32,6 +33,7 @@ static const char *CitusNodeTagNamesD[] = { "MapMergeJob", "MultiPlan", "Task", + "TaskExecution", "ShardInterval", "ShardPlacement", "RelationShard", @@ -361,7 +363,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a, { \ #type, \ sizeof(type), \ - CopyUnsupportedCitusNode, \ + CopyNode##type, \ EqualUnsupportedCitusNode, \ Out##type, \ Read##type \ @@ -388,6 +390,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(ShardPlacement), DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(Task), + DEFINE_NODE_METHODS(TaskExecution), DEFINE_NODE_METHODS(DeferredErrorMessage), DEFINE_NODE_METHODS(GroupShardPlacement), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 41d514e89..b4bb5bb30 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -26,6 +26,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" +#include "distributed/multi_server_executor.h" #include "distributed/master_metadata_utility.h" #include "lib/stringinfo.h" #include "nodes/plannodes.h" @@ -52,6 +53,11 @@ #define WRITE_INT_FIELD(fldname) \ appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname) +/* Write an 64-bit integer field (anything written as ":fldname %d") */ +#define WRITE_INT64_FIELD(fldname) \ + appendStringInfo(str, " :" CppAsString(fldname) " " INT64_FORMAT, node->fldname) + + /* Write an unsigned integer field (anything written as ":fldname %u") */ #define WRITE_UINT_FIELD(fldname) \ appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname) @@ -106,6 +112,27 @@ _outBitmapset(str, node->fldname)) +/* Write an integer array (anything written as ":fldname (%d, %d") */ +#define WRITE_INT_ARRAY(fldname, count) \ + appendStringInfo(str, " :" CppAsString(fldname) " ("); \ + { \ + int i;\ + for (i = 0; i < count; i++) \ + { \ + if (i > 0) \ + { \ + appendStringInfo(str, ", "); \ + } \ + appendStringInfo(str, "%d", node->fldname[i]); \ + }\ + }\ + appendStringInfo(str, ")") + + +/* Write an enum array (anything written as ":fldname (%d, %d") */ +#define WRITE_ENUM_ARRAY(fldname, count) WRITE_INT_ARRAY(fldname, count) + + #define booltostr(x) ((x) ? "true" : "false") @@ -417,6 +444,29 @@ OutTask(OUTFUNC_ARGS) } +void +OutTaskExecution(OUTFUNC_ARGS) +{ + WRITE_LOCALS(TaskExecution); + WRITE_NODE_TYPE("TASKEXECUTION"); + + WRITE_UINT64_FIELD(jobId); + WRITE_UINT_FIELD(taskId); + WRITE_UINT_FIELD(nodeCount); + + WRITE_ENUM_ARRAY(taskStatusArray, node->nodeCount); + WRITE_ENUM_ARRAY(transmitStatusArray, node->nodeCount); + WRITE_INT_ARRAY(connectionIdArray, node->nodeCount); + WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount); + + WRITE_INT64_FIELD(connectStartTime); + WRITE_UINT_FIELD(currentNodeIndex); + WRITE_UINT_FIELD(querySourceNodeIndex); + WRITE_INT_FIELD(dataFetchTaskIndex); + WRITE_UINT_FIELD(failureCount); +} + + void OutDeferredErrorMessage(OUTFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index f4c1e3747..849c54ba1 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -16,6 +16,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/errormessage.h" #include "distributed/multi_planner.h" +#include "distributed/multi_server_executor.h" #include "nodes/parsenodes.h" #include "nodes/readfuncs.h" @@ -58,6 +59,12 @@ CitusSetTag(Node *node, int tag) token = pg_strtok(&length); /* get field value */ \ local_node->fldname = atoi(token) +/* Read an 64-bit integer field (anything written as ":fldname %d") */ +#define READ_INT64_FIELD(fldname) \ + token = pg_strtok(&length); /* skip :fldname */ \ + token = pg_strtok(&length); /* get field value */ \ + local_node->fldname = (int64) strtoll(token, NULL, 10) + /* Read an unsigned integer field (anything written as ":fldname %u") */ #define READ_UINT_FIELD(fldname) \ token = pg_strtok(&length); /* skip :fldname */ \ @@ -119,6 +126,23 @@ CitusSetTag(Node *node, int tag) (void) token; /* in case not used elsewhere */ \ local_node->fldname = nodeRead(NULL, 0) + /* Read an integer field (anything written as ":fldname %d") */ + #define READ_ENUM_ARRAY(fldname, count, enumtype) \ + token = pg_strtok(&length); /* skip :fldname */ \ + token = pg_strtok(&length); /* skip ( */ \ + { \ + int i = 0; \ + for (i = 0; i < count; i++ ) \ + { \ + token = pg_strtok(&length); /* get field value */ \ + local_node->fldname[i] = (enumtype) atoi(token); \ + } \ + } \ + token = pg_strtok(&length); /* skip ) */ \ + (void) token + +#define READ_INT_ARRAY(fldname, count) READ_ENUM_ARRAY(fldname, count, int32) + /* Routine exit */ #define READ_DONE() \ return; @@ -147,6 +171,8 @@ readJobInfo(Job *local_node) { READ_TEMP_LOCALS(); + CitusSetTag((Node *) local_node, T_Job); + READ_UINT64_FIELD(jobId); READ_NODE_FIELD(jobQuery); READ_NODE_FIELD(taskList); @@ -330,6 +356,13 @@ ReadTask(READFUNC_ARGS) } +READFUNC_RET +ReadTaskExecution(READFUNC_ARGS) +{ + ereport(ERROR, (errmsg("unexpected read request for TaskExecution node"))); +} + + READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS) { diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 5dcd26cd6..97072fb65 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -39,6 +39,8 @@ extern void RegisterNodes(void); #define READFUNC_RET void #define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node +#define COPYFUNC_ARGS struct ExtensibleNode *target_node, const struct \ + ExtensibleNode *source_node extern READFUNC_RET ReadJob(READFUNC_ARGS); extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS); @@ -47,6 +49,7 @@ extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); +extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS); extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS); @@ -59,6 +62,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); +extern void OutTaskExecution(OUTFUNC_ARGS); extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutGroupShardPlacement(OUTFUNC_ARGS); @@ -73,4 +77,15 @@ extern void OutMultiPartition(OUTFUNC_ARGS); extern void OutMultiCartesianProduct(OUTFUNC_ARGS); extern void OutMultiExtendedOp(OUTFUNC_ARGS); +extern void CopyNodeJob(COPYFUNC_ARGS); +extern void CopyNodeMultiPlan(COPYFUNC_ARGS); +extern void CopyNodeShardInterval(COPYFUNC_ARGS); +extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); +extern void CopyNodeShardPlacement(COPYFUNC_ARGS); +extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS); +extern void CopyNodeRelationShard(COPYFUNC_ARGS); +extern void CopyNodeTask(COPYFUNC_ARGS); +extern void CopyNodeTaskExecution(COPYFUNC_ARGS); +extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS); + #endif /* CITUS_NODEFUNCS_H */ diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 829652abf..1497cf626 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -57,6 +57,7 @@ typedef enum CitusNodeTag T_MapMergeJob, T_MultiPlan, T_Task, + T_TaskExecution, T_ShardInterval, T_ShardPlacement, T_RelationShard, diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index b6997496a..5c97d2575 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -116,6 +116,7 @@ typedef enum */ struct TaskExecution { + CitusNode type; uint64 jobId; uint32 taskId;