Remove serialization/deserialization of multiplan node (#1477)

introduces copy functions for Citus MultiPlan nodes.
uses ExtensibleNode mechanism to store MultiPlan data
drops serialiazation of MultiPlans
pull/1534/head
Murat Tuncer 2017-08-02 08:24:00 +03:00 committed by GitHub
parent f0275fe4ae
commit fa18899cf9
10 changed files with 417 additions and 54 deletions

View File

@ -170,7 +170,8 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus)
uint32 nodeCount = list_length(task->taskPlacementList);
uint32 nodeIndex = 0;
TaskExecution *taskExecution = palloc0(sizeof(TaskExecution));
TaskExecution *taskExecution = CitusMakeNode(TaskExecution);
taskExecution->jobId = task->jobId;
taskExecution->taskId = task->taskId;
taskExecution->nodeCount = nodeCount;
@ -235,7 +236,7 @@ CleanupTaskExecution(TaskExecution *taskExecution)
pfree(taskExecution->taskStatusArray);
pfree(taskExecution->connectionIdArray);
pfree(taskExecution->fileDescriptorArray);
memset(taskExecution, 0, sizeof(TaskExecution));
pfree(taskExecution);
}

View File

@ -2711,6 +2711,7 @@ TrackerCleanupResources(HTAB *taskTrackerHash, HTAB *transmitTrackerHash,
TaskExecution *taskExecution = task->taskExecution;
CleanupTaskExecution(taskExecution);
task->taskExecution = NULL;
}
/*

View File

@ -70,13 +70,12 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin
plannerRestrictionContext);
static void AssignRTEIdentities(Query *queryTree);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static Node * SerializeMultiPlan(struct MultiPlan *multiPlan);
static MultiPlan * DeserializeMultiPlan(Node *node);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static void CheckNodeIsDumpable(Node *node);
static Node * CheckNodeCopyAndSerialization(Node *node);
static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
@ -412,60 +411,17 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
MultiPlan *
GetMultiPlan(CustomScan *customScan)
{
Node *node = NULL;
MultiPlan *multiPlan = NULL;
Assert(list_length(customScan->custom_private) == 1);
multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private));
node = (Node *) linitial(customScan->custom_private);
Assert(CitusIsA(node, MultiPlan));
return multiPlan;
}
node = CheckNodeCopyAndSerialization(node);
/*
* SerializeMultiPlan returns the string representing the distributed plan in a
* Const node.
*
* Note that this should be improved for 9.6+, we we can copy trees efficiently.
* I.e. we should introduce copy support for relevant node types, and just
* return the MultiPlan as-is for 9.6.
*/
static Node *
SerializeMultiPlan(MultiPlan *multiPlan)
{
char *serializedMultiPlan = NULL;
Const *multiPlanData = NULL;
serializedMultiPlan = nodeToString(multiPlan);
multiPlanData = makeNode(Const);
multiPlanData->consttype = CSTRINGOID;
multiPlanData->constlen = strlen(serializedMultiPlan);
multiPlanData->constvalue = CStringGetDatum(serializedMultiPlan);
multiPlanData->constbyval = false;
multiPlanData->location = -1;
return (Node *) multiPlanData;
}
/*
* DeserializeMultiPlan returns the deserialized distributed plan from the string
* representation in a Const node.
*/
static MultiPlan *
DeserializeMultiPlan(Node *node)
{
Const *multiPlanData = NULL;
char *serializedMultiPlan = NULL;
MultiPlan *multiPlan = NULL;
Assert(IsA(node, Const));
multiPlanData = (Const *) node;
serializedMultiPlan = DatumGetCString(multiPlanData->constvalue);
multiPlan = (MultiPlan *) stringToNode(serializedMultiPlan);
Assert(CitusIsA(multiPlan, MultiPlan));
multiPlan = (MultiPlan *) node;
return multiPlan;
}
@ -521,7 +477,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
}
}
multiPlanData = SerializeMultiPlan(multiPlan);
multiPlanData = (Node *) multiPlan;
customScan->custom_private = list_make1(multiPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
@ -666,6 +622,36 @@ CheckNodeIsDumpable(Node *node)
}
/*
* CheckNodeCopyAndSerialization checks copy/dump/read functions
* for nodes and returns copy of the input.
*
* It is only active when assertions are enabled, otherwise it returns
* the input directly. We use this to confirm that our serialization
* and copy logic produces the correct plan during regression tests.
*
* It does not check string equality on node dumps due to differences
* in some Postgres types.
*/
static Node *
CheckNodeCopyAndSerialization(Node *node)
{
#ifdef USE_ASSERT_CHECKING
char *out = nodeToString(node);
Node *deserializedNode = (Node *) stringToNode(out);
Node *nodeCopy = copyObject(deserializedNode);
char *outCopy = nodeToString(nodeCopy);
pfree(out);
pfree(outCopy);
return nodeCopy;
#else
return node;
#endif
}
/*
* multi_join_restriction_hook is a hook called by postgresql standard planner
* to notify us about various planning information regarding joins. We use

View File

@ -0,0 +1,272 @@
/*-------------------------------------------------------------------------
*
* citus_copyfuncs.c
* Citus specific node copy functions
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2012-2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/multi_server_executor.h"
#include "utils/datum.h"
/*
* Macros to simplify copying of different kinds of fields. Use these
* wherever possible to reduce the chance for silly typos. Note that these
* hard-wire the convention that the local variables in a Copy routine are
* named 'newnode' and 'from'.
*/
static inline Node *
CitusSetTag(Node *node, int tag)
{
CitusNode *citus_node = (CitusNode *) node;
citus_node->citus_tag = tag;
return node;
}
#define DECLARE_FROM_AND_NEW_NODE(nodeTypeName) \
nodeTypeName * newnode = (nodeTypeName *) \
CitusSetTag((Node *) target_node, T_ ## nodeTypeName); \
nodeTypeName *from = (nodeTypeName *) source_node
/* Copy a simple scalar field (int, float, bool, enum, etc) */
#define COPY_SCALAR_FIELD(fldname) \
(newnode->fldname = from->fldname)
/* Copy a field that is a pointer to some kind of Node or Node tree */
#define COPY_NODE_FIELD(fldname) \
(newnode->fldname = copyObject(from->fldname))
/* Copy a field that is a pointer to a C string, or perhaps NULL */
#define COPY_STRING_FIELD(fldname) \
(newnode->fldname = from->fldname ? pstrdup(from->fldname) : (char *) NULL)
/* Copy a node array. Target array is also allocated. */
#define COPY_NODE_ARRAY(fldname, type, count) \
do { \
int i = 0; \
newnode->fldname = (type **) palloc(count * sizeof(type *)); \
for (i = 0; i < count; ++i) \
{ \
newnode->fldname[i] = copyObject(from->fldname[i]); \
} \
} \
while (0)
/* Copy a scalar array. Target array is also allocated. */
#define COPY_SCALAR_ARRAY(fldname, type, count) \
do { \
int i = 0; \
newnode->fldname = (type *) palloc(count * sizeof(type)); \
for (i = 0; i < count; ++i) \
{ \
newnode->fldname[i] = from->fldname[i]; \
} \
} \
while (0)
static void
copyJobInfo(Job *newnode, Job *from)
{
COPY_SCALAR_FIELD(jobId);
COPY_NODE_FIELD(jobQuery);
COPY_NODE_FIELD(taskList);
COPY_NODE_FIELD(dependedJobList);
COPY_SCALAR_FIELD(subqueryPushdown);
COPY_SCALAR_FIELD(requiresMasterEvaluation);
COPY_SCALAR_FIELD(deferredPruning);
}
void
CopyNodeJob(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(Job);
copyJobInfo(newnode, from);
}
void
CopyNodeMultiPlan(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(MultiPlan);
COPY_SCALAR_FIELD(operation);
COPY_SCALAR_FIELD(hasReturning);
COPY_NODE_FIELD(workerJob);
COPY_NODE_FIELD(masterQuery);
COPY_SCALAR_FIELD(routerExecutable);
COPY_NODE_FIELD(insertSelectSubquery);
COPY_NODE_FIELD(insertTargetList);
COPY_SCALAR_FIELD(targetRelationId);
COPY_NODE_FIELD(planningError);
}
void
CopyNodeShardInterval(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(ShardInterval);
COPY_SCALAR_FIELD(relationId);
COPY_SCALAR_FIELD(storageType);
COPY_SCALAR_FIELD(valueTypeId);
COPY_SCALAR_FIELD(valueTypeLen);
COPY_SCALAR_FIELD(valueByVal);
COPY_SCALAR_FIELD(minValueExists);
COPY_SCALAR_FIELD(maxValueExists);
if (from->minValueExists)
{
newnode->minValue = datumCopy(from->minValue,
from->valueByVal,
from->valueTypeLen);
}
if (from->maxValueExists)
{
newnode->maxValue = datumCopy(from->maxValue,
from->valueByVal,
from->valueTypeLen);
}
COPY_SCALAR_FIELD(shardId);
}
void
CopyNodeMapMergeJob(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(MapMergeJob);
int arrayLength = 0;
copyJobInfo(&newnode->job, &from->job);
COPY_NODE_FIELD(reduceQuery);
COPY_SCALAR_FIELD(partitionType);
COPY_NODE_FIELD(partitionColumn);
COPY_SCALAR_FIELD(partitionCount);
COPY_SCALAR_FIELD(sortedShardIntervalArrayLength);
arrayLength = from->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
COPY_NODE_ARRAY(sortedShardIntervalArray, ShardInterval, arrayLength);
COPY_NODE_FIELD(mapTaskList);
COPY_NODE_FIELD(mergeTaskList);
}
void
CopyNodeShardPlacement(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(ShardPlacement);
COPY_SCALAR_FIELD(placementId);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardLength);
COPY_SCALAR_FIELD(shardState);
COPY_SCALAR_FIELD(groupId);
COPY_STRING_FIELD(nodeName);
COPY_SCALAR_FIELD(nodePort);
COPY_SCALAR_FIELD(partitionMethod);
COPY_SCALAR_FIELD(colocationGroupId);
COPY_SCALAR_FIELD(representativeValue);
}
void
CopyNodeGroupShardPlacement(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(GroupShardPlacement);
COPY_SCALAR_FIELD(placementId);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardLength);
COPY_SCALAR_FIELD(shardState);
COPY_SCALAR_FIELD(groupId);
}
void
CopyNodeRelationShard(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(RelationShard);
COPY_SCALAR_FIELD(relationId);
COPY_SCALAR_FIELD(shardId);
}
void
CopyNodeTask(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(Task);
COPY_SCALAR_FIELD(taskType);
COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId);
COPY_STRING_FIELD(queryString);
COPY_SCALAR_FIELD(anchorShardId);
COPY_NODE_FIELD(taskPlacementList);
COPY_NODE_FIELD(dependedTaskList);
COPY_SCALAR_FIELD(partitionId);
COPY_SCALAR_FIELD(upstreamTaskId);
COPY_NODE_FIELD(shardInterval);
COPY_SCALAR_FIELD(assignmentConstrained);
COPY_SCALAR_FIELD(shardId);
COPY_NODE_FIELD(taskExecution);
COPY_SCALAR_FIELD(upsertQuery);
COPY_SCALAR_FIELD(replicationModel);
COPY_SCALAR_FIELD(insertSelectQuery);
COPY_NODE_FIELD(relationShardList);
}
void
CopyNodeTaskExecution(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(TaskExecution);
COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(nodeCount);
COPY_SCALAR_ARRAY(taskStatusArray, TaskExecStatus, from->nodeCount);
COPY_SCALAR_ARRAY(transmitStatusArray, TransmitExecStatus, from->nodeCount);
COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount);
COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount);
COPY_SCALAR_FIELD(connectStartTime);
COPY_SCALAR_FIELD(currentNodeIndex);
COPY_SCALAR_FIELD(querySourceNodeIndex);
COPY_SCALAR_FIELD(dataFetchTaskIndex);
COPY_SCALAR_FIELD(failureCount);
}
void
CopyNodeDeferredErrorMessage(COPYFUNC_ARGS)
{
DECLARE_FROM_AND_NEW_NODE(DeferredErrorMessage);
COPY_SCALAR_FIELD(code);
COPY_STRING_FIELD(message);
COPY_STRING_FIELD(detail);
COPY_STRING_FIELD(hint);
COPY_STRING_FIELD(filename);
COPY_SCALAR_FIELD(linenumber);
COPY_STRING_FIELD(functionname);
}

View File

@ -16,6 +16,7 @@
#include "distributed/errormessage.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
static const char *CitusNodeTagNamesD[] = {
"MultiNode",
@ -32,6 +33,7 @@ static const char *CitusNodeTagNamesD[] = {
"MapMergeJob",
"MultiPlan",
"Task",
"TaskExecution",
"ShardInterval",
"ShardPlacement",
"RelationShard",
@ -361,7 +363,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
CopyNode##type, \
EqualUnsupportedCitusNode, \
Out##type, \
Read##type \
@ -388,6 +390,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(ShardPlacement),
DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(TaskExecution),
DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement),

View File

@ -26,6 +26,7 @@
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/master_metadata_utility.h"
#include "lib/stringinfo.h"
#include "nodes/plannodes.h"
@ -52,6 +53,11 @@
#define WRITE_INT_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
/* Write an 64-bit integer field (anything written as ":fldname %d") */
#define WRITE_INT64_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " " INT64_FORMAT, node->fldname)
/* Write an unsigned integer field (anything written as ":fldname %u") */
#define WRITE_UINT_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " %u", node->fldname)
@ -106,6 +112,27 @@
_outBitmapset(str, node->fldname))
/* Write an integer array (anything written as ":fldname (%d, %d") */
#define WRITE_INT_ARRAY(fldname, count) \
appendStringInfo(str, " :" CppAsString(fldname) " ("); \
{ \
int i;\
for (i = 0; i < count; i++) \
{ \
if (i > 0) \
{ \
appendStringInfo(str, ", "); \
} \
appendStringInfo(str, "%d", node->fldname[i]); \
}\
}\
appendStringInfo(str, ")")
/* Write an enum array (anything written as ":fldname (%d, %d") */
#define WRITE_ENUM_ARRAY(fldname, count) WRITE_INT_ARRAY(fldname, count)
#define booltostr(x) ((x) ? "true" : "false")
@ -417,6 +444,29 @@ OutTask(OUTFUNC_ARGS)
}
void
OutTaskExecution(OUTFUNC_ARGS)
{
WRITE_LOCALS(TaskExecution);
WRITE_NODE_TYPE("TASKEXECUTION");
WRITE_UINT64_FIELD(jobId);
WRITE_UINT_FIELD(taskId);
WRITE_UINT_FIELD(nodeCount);
WRITE_ENUM_ARRAY(taskStatusArray, node->nodeCount);
WRITE_ENUM_ARRAY(transmitStatusArray, node->nodeCount);
WRITE_INT_ARRAY(connectionIdArray, node->nodeCount);
WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount);
WRITE_INT64_FIELD(connectStartTime);
WRITE_UINT_FIELD(currentNodeIndex);
WRITE_UINT_FIELD(querySourceNodeIndex);
WRITE_INT_FIELD(dataFetchTaskIndex);
WRITE_UINT_FIELD(failureCount);
}
void
OutDeferredErrorMessage(OUTFUNC_ARGS)
{

View File

@ -16,6 +16,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/errormessage.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
#include "nodes/parsenodes.h"
#include "nodes/readfuncs.h"
@ -58,6 +59,12 @@ CitusSetTag(Node *node, int tag)
token = pg_strtok(&length); /* get field value */ \
local_node->fldname = atoi(token)
/* Read an 64-bit integer field (anything written as ":fldname %d") */
#define READ_INT64_FIELD(fldname) \
token = pg_strtok(&length); /* skip :fldname */ \
token = pg_strtok(&length); /* get field value */ \
local_node->fldname = (int64) strtoll(token, NULL, 10)
/* Read an unsigned integer field (anything written as ":fldname %u") */
#define READ_UINT_FIELD(fldname) \
token = pg_strtok(&length); /* skip :fldname */ \
@ -119,6 +126,23 @@ CitusSetTag(Node *node, int tag)
(void) token; /* in case not used elsewhere */ \
local_node->fldname = nodeRead(NULL, 0)
/* Read an integer field (anything written as ":fldname %d") */
#define READ_ENUM_ARRAY(fldname, count, enumtype) \
token = pg_strtok(&length); /* skip :fldname */ \
token = pg_strtok(&length); /* skip ( */ \
{ \
int i = 0; \
for (i = 0; i < count; i++ ) \
{ \
token = pg_strtok(&length); /* get field value */ \
local_node->fldname[i] = (enumtype) atoi(token); \
} \
} \
token = pg_strtok(&length); /* skip ) */ \
(void) token
#define READ_INT_ARRAY(fldname, count) READ_ENUM_ARRAY(fldname, count, int32)
/* Routine exit */
#define READ_DONE() \
return;
@ -147,6 +171,8 @@ readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
CitusSetTag((Node *) local_node, T_Job);
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
@ -330,6 +356,13 @@ ReadTask(READFUNC_ARGS)
}
READFUNC_RET
ReadTaskExecution(READFUNC_ARGS)
{
ereport(ERROR, (errmsg("unexpected read request for TaskExecution node")));
}
READFUNC_RET
ReadDeferredErrorMessage(READFUNC_ARGS)
{

View File

@ -39,6 +39,8 @@ extern void RegisterNodes(void);
#define READFUNC_RET void
#define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node
#define COPYFUNC_ARGS struct ExtensibleNode *target_node, const struct \
ExtensibleNode *source_node
extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS);
@ -47,6 +49,7 @@ extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadTaskExecution(READFUNC_ARGS);
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS);
@ -59,6 +62,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutTaskExecution(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutGroupShardPlacement(OUTFUNC_ARGS);
@ -73,4 +77,15 @@ extern void OutMultiPartition(OUTFUNC_ARGS);
extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS);
extern void CopyNodeJob(COPYFUNC_ARGS);
extern void CopyNodeMultiPlan(COPYFUNC_ARGS);
extern void CopyNodeShardInterval(COPYFUNC_ARGS);
extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
extern void CopyNodeShardPlacement(COPYFUNC_ARGS);
extern void CopyNodeGroupShardPlacement(COPYFUNC_ARGS);
extern void CopyNodeRelationShard(COPYFUNC_ARGS);
extern void CopyNodeTask(COPYFUNC_ARGS);
extern void CopyNodeTaskExecution(COPYFUNC_ARGS);
extern void CopyNodeDeferredErrorMessage(COPYFUNC_ARGS);
#endif /* CITUS_NODEFUNCS_H */

View File

@ -57,6 +57,7 @@ typedef enum CitusNodeTag
T_MapMergeJob,
T_MultiPlan,
T_Task,
T_TaskExecution,
T_ShardInterval,
T_ShardPlacement,
T_RelationShard,

View File

@ -116,6 +116,7 @@ typedef enum
*/
struct TaskExecution
{
CitusNode type;
uint64 jobId;
uint32 taskId;