Use extensible node API, abstract read-/out- funcs

This introduces macros to wrap differences between 9.5 and 9.6 read-
and out- functions for our custom nodes, allowing better code reuse
for those functions.

Additionally, if running on PostgreSQL 9.6, this change will register
extensible node types for all our custom nodes, permitting us to rely
on the standard PostgreSQL nodeToString, stringToNode, and readNode
functions.

Long-term, this change lets us avoid copy-pasting a ton of readfuncs
code as well as well as some other static methods we've been copying
in the past.
pull/858/head
Andres Freund 2016-03-03 19:42:37 -08:00 committed by Jason Petersen
parent a18e757703
commit 8c67429e58
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
13 changed files with 769 additions and 3119 deletions

1
.gitattributes vendored
View File

@ -28,7 +28,6 @@ configure -whitespace
src/backend/distributed/utils/citus_outfuncs.c -citus-style
src/backend/distributed/utils/citus_read.c -citus-style
src/backend/distributed/utils/citus_readfuncs_95.c -citus-style
src/backend/distributed/utils/citus_readfuncs_96.c -citus-style
src/backend/distributed/utils/ruleutils_95.c -citus-style
src/backend/distributed/utils/ruleutils_96.c -citus-style
src/include/distributed/citus_nodes.h -citus-style

View File

@ -18,6 +18,7 @@
#include "commands/explain.h"
#include "executor/executor.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/commit_protocol.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
@ -129,6 +130,9 @@ _PG_init(void)
*/
RegisterCitusConfigVariables();
/* make our additional node types known */
RegisterNodes();
/* intercept planner */
planner_hook = multi_planner;

View File

@ -11,9 +11,31 @@
#include "postgres.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/metadata_cache.h"
static const char *CitusNodeTagNamesD[] = {
"MultiNode",
"MultiTreeRoot",
"MultiProject",
"MultiCollect",
"MultiSelect",
"MultiTable",
"MultiJoin",
"MultiPartition",
"MultiCartesianProduct",
"MultiExtendedOp",
"Job",
"MapMergeJob",
"MultiPlan",
"Task",
"ShardInterval",
"ShardPlacement"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_extradata_container);
@ -307,3 +329,84 @@ citus_extradata_container(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
#if (PG_VERSION_NUM >= 90600)
static void
CopyUnsupportedCitusNode(struct ExtensibleNode *newnode,
const struct ExtensibleNode *oldnode)
{
ereport(ERROR, (errmsg("not implemented")));
}
static bool
EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
const struct ExtensibleNode *b)
{
ereport(ERROR, (errmsg("not implemented")));
}
/* *INDENT-OFF* */
#define DEFINE_NODE_METHODS(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
Read##type \
}
#define DEFINE_NODE_METHODS_NO_READ(type) \
{ \
#type, \
sizeof(type), \
CopyUnsupportedCitusNode, \
EqualUnsupportedCitusNode, \
Out##type, \
ReadUnsupportedCitusNode \
}
/* *INDENT-ON* */
const ExtensibleNodeMethods nodeMethods[] =
{
DEFINE_NODE_METHODS(MultiPlan),
DEFINE_NODE_METHODS(Job),
DEFINE_NODE_METHODS(ShardInterval),
DEFINE_NODE_METHODS(MapMergeJob),
DEFINE_NODE_METHODS(ShardPlacement),
DEFINE_NODE_METHODS(Task),
/* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode),
DEFINE_NODE_METHODS_NO_READ(MultiTreeRoot),
DEFINE_NODE_METHODS_NO_READ(MultiProject),
DEFINE_NODE_METHODS_NO_READ(MultiCollect),
DEFINE_NODE_METHODS_NO_READ(MultiSelect),
DEFINE_NODE_METHODS_NO_READ(MultiTable),
DEFINE_NODE_METHODS_NO_READ(MultiJoin),
DEFINE_NODE_METHODS_NO_READ(MultiPartition),
DEFINE_NODE_METHODS_NO_READ(MultiCartesianProduct),
DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp)
};
#endif
void
RegisterNodes(void)
{
#if (PG_VERSION_NUM >= 90600)
int off;
StaticAssertExpr(lengthof(nodeMethods) == lengthof(CitusNodeTagNamesD),
"number of node methods and names do not match");
for (off = 0; off < lengthof(nodeMethods); off++)
{
RegisterExtensibleNodeMethods(&nodeMethods[off]);
}
#endif
}

View File

@ -38,10 +38,21 @@
* routine.
*/
/* Store const reference to raw input node in local named 'node' */
#define WRITE_LOCALS(nodeTypeName) \
const nodeTypeName *node = (const nodeTypeName *) raw_node
/* Write the label for the node type */
#if (PG_VERSION_NUM >= 90600)
#define WRITE_NODE_TYPE(nodelabel) \
(void) 0
#else
#define WRITE_NODE_TYPE(nodelabel) \
appendStringInfoString(str, nodelabel)
#endif
/* Write an integer field (anything written as ":fldname %d") */
#define WRITE_INT_FIELD(fldname) \
appendStringInfo(str, " :" CppAsString(fldname) " %d", node->fldname)
@ -83,7 +94,7 @@
/* Write a character-string (possibly NULL) field */
#define WRITE_STRING_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outToken(str, node->fldname))
outToken(str, node->fldname))
/* Write a parse location field (actually same as INT case) */
#define WRITE_LOCATION_FIELD(fldname) \
@ -92,7 +103,7 @@
/* Write a Node field */
#define WRITE_NODE_FIELD(fldname) \
(appendStringInfo(str, " :" CppAsString(fldname) " "), \
_outNode(str, node->fldname))
outNode(str, node->fldname))
/* Write a bitmapset field */
#define WRITE_BITMAPSET_FIELD(fldname) \
@ -102,18 +113,18 @@
#define booltostr(x) ((x) ? "true" : "false")
static void _outNode(StringInfo str, const void *obj);
#if (PG_VERSION_NUM < 90600)
static void outNode(StringInfo str, const void *obj);
/*
* _outToken
* outToken
* Convert an ordinary string (eg, an identifier) into a form that
* will be decoded back to a plain token by read.c's functions.
*
* If a null or empty string is given, it is encoded as "<>".
*/
static void
_outToken(StringInfo str, const char *s)
outToken(StringInfo str, const char *s)
{
if (s == NULL || *s == '\0')
{
@ -166,7 +177,7 @@ _outList(StringInfo str, const List *node)
*/
if (IsA(node, List))
{
_outNode(str, lfirst(lc));
outNode(str, lfirst(lc));
if (lnext(lc))
appendStringInfoChar(str, ' ');
}
@ -187,7 +198,7 @@ _outList(StringInfo str, const List *node)
* Print the value of a Datum given its type.
*/
static void
_outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
{
Size length,
i;
@ -218,38 +229,49 @@ _outDatum(StringInfo str, Datum value, int typlen, bool typbyval)
}
}
#endif
/*****************************************************************************
* Output routines for Citus node types
*****************************************************************************/
static void
_outMultiUnaryNode(StringInfo str, const MultiUnaryNode *node)
OutMultiUnaryNodeFields(StringInfo str, const MultiUnaryNode *node)
{
WRITE_NODE_FIELD(childNode);
}
static void
_outMultiBinaryNode(StringInfo str, const MultiBinaryNode *node)
OutMultiBinaryNodeFields(StringInfo str, const MultiBinaryNode *node)
{
WRITE_NODE_FIELD(leftChildNode);
WRITE_NODE_FIELD(rightChildNode);
}
static void
_outMultiTreeRoot(StringInfo str, const MultiTreeRoot *node)
void
OutMultiNode(OUTFUNC_ARGS)
{
WRITE_NODE_TYPE("MULTITREEROOT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
WRITE_NODE_TYPE("MULTINODE");
}
static void
_outMultiPlan(StringInfo str, const MultiPlan *node)
void
OutMultiTreeRoot(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTreeRoot);
WRITE_NODE_TYPE("MULTITREEROOT");
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
void
OutMultiPlan(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPlan);
WRITE_NODE_TYPE("MULTIPLAN");
WRITE_NODE_FIELD(workerJob);
@ -258,87 +280,95 @@ _outMultiPlan(StringInfo str, const MultiPlan *node)
}
static void
_outMultiProject(StringInfo str, const MultiProject *node)
void
OutMultiProject(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiProject);
WRITE_NODE_TYPE("MULTIPROJECT");
WRITE_NODE_FIELD(columnList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCollect(StringInfo str, const MultiCollect *node)
void
OutMultiCollect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCollect);
WRITE_NODE_TYPE("MULTICOLLECT");
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiSelect(StringInfo str, const MultiSelect *node)
void
OutMultiSelect(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiSelect);
WRITE_NODE_TYPE("MULTISELECT");
WRITE_NODE_FIELD(selectClauseList);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiTable(StringInfo str, const MultiTable *node)
void
OutMultiTable(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiTable);
WRITE_NODE_TYPE("MULTITABLE");
WRITE_OID_FIELD(relationId);
WRITE_INT_FIELD(rangeTableId);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiJoin(StringInfo str, const MultiJoin *node)
void
OutMultiJoin(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiJoin);
WRITE_NODE_TYPE("MULTIJOIN");
WRITE_NODE_FIELD(joinClauseList);
WRITE_ENUM_FIELD(joinRuleType, JoinRuleType);
WRITE_ENUM_FIELD(joinType, JoinType);
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiPartition(StringInfo str, const MultiPartition *node)
void
OutMultiPartition(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiPartition);
WRITE_NODE_TYPE("MULTIPARTITION");
WRITE_NODE_FIELD(partitionColumn);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outMultiCartesianProduct(StringInfo str, const MultiCartesianProduct *node)
void
OutMultiCartesianProduct(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiCartesianProduct);
WRITE_NODE_TYPE("MULTICARTESIANPRODUCT");
_outMultiBinaryNode(str, (const MultiBinaryNode *) node);
OutMultiBinaryNodeFields(str, (const MultiBinaryNode *) node);
}
static void
_outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
void
OutMultiExtendedOp(OUTFUNC_ARGS)
{
WRITE_LOCALS(MultiExtendedOp);
WRITE_NODE_TYPE("MULTIEXTENDEDOP");
WRITE_NODE_FIELD(targetList);
@ -348,11 +378,11 @@ _outMultiExtendedOp(StringInfo str, const MultiExtendedOp *node)
WRITE_NODE_FIELD(limitOffset);
WRITE_NODE_FIELD(havingQual);
_outMultiUnaryNode(str, (const MultiUnaryNode *) node);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}
static void
_outJobInfo(StringInfo str, const Job *node)
OutJobFields(StringInfo str, const Job *node)
{
WRITE_UINT64_FIELD(jobId);
WRITE_NODE_FIELD(jobQuery);
@ -362,18 +392,20 @@ _outJobInfo(StringInfo str, const Job *node)
}
static void
_outJob(StringInfo str, const Job *node)
void
OutJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(Job);
WRITE_NODE_TYPE("JOB");
_outJobInfo(str, node);
OutJobFields(str, node);
}
static void
_outShardInterval(StringInfo str, const ShardInterval *node)
void
OutShardInterval(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardInterval);
WRITE_NODE_TYPE("SHARDINTERVAL");
WRITE_OID_FIELD(relationId);
@ -388,27 +420,28 @@ _outShardInterval(StringInfo str, const ShardInterval *node)
if (!node->minValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->minValue, node->valueTypeLen, node->valueByVal);
appendStringInfoString(str, " :maxValue ");
if (!node->maxValueExists)
appendStringInfoString(str, "<>");
else
_outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
WRITE_UINT64_FIELD(shardId);
}
static void
_outMapMergeJob(StringInfo str, const MapMergeJob *node)
void
OutMapMergeJob(OUTFUNC_ARGS)
{
WRITE_LOCALS(MapMergeJob);
int arrayLength = node->sortedShardIntervalArrayLength;
int i;
WRITE_NODE_TYPE("MAPMERGEJOB");
_outJobInfo(str, (Job *) node);
OutJobFields(str, (Job *) node);
WRITE_NODE_FIELD(reduceQuery);
WRITE_ENUM_FIELD(partitionType, PartitionType);
WRITE_NODE_FIELD(partitionColumn);
@ -417,9 +450,7 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
for (i = 0; i < arrayLength; ++i)
{
ShardInterval *writeElement = node->sortedShardIntervalArray[i];
_outShardInterval(str, writeElement);
outNode(str, node->sortedShardIntervalArray[i]);
}
WRITE_NODE_FIELD(mapTaskList);
@ -427,9 +458,10 @@ _outMapMergeJob(StringInfo str, const MapMergeJob *node)
}
static void
_outShardPlacement(StringInfo str, const ShardPlacement *node)
void
OutShardPlacement(OUTFUNC_ARGS)
{
WRITE_LOCALS(ShardPlacement);
WRITE_NODE_TYPE("SHARDPLACEMENT");
WRITE_UINT64_FIELD(placementId);
@ -441,9 +473,10 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node)
}
static void
_outTask(StringInfo str, const Task *node)
void
OutTask(OUTFUNC_ARGS)
{
WRITE_LOCALS(Task);
WRITE_NODE_TYPE("TASK");
WRITE_ENUM_FIELD(taskType, TaskType);
@ -462,13 +495,14 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(requiresMasterEvaluation);
}
#if (PG_VERSION_NUM < 90600)
/*
* _outNode -
* outNode -
* converts a Node into ascii string and append it to 'str'
*/
static void
_outNode(StringInfo str, const void *obj)
outNode(StringInfo str, const void *obj)
{
if (obj == NULL)
{
@ -487,91 +521,91 @@ _outNode(StringInfo str, const void *obj)
case T_MultiTreeRoot:
appendStringInfoChar(str, '{');
_outMultiTreeRoot(str, obj);
OutMultiTreeRoot(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiProject:
appendStringInfoChar(str, '{');
_outMultiProject(str, obj);
OutMultiProject(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCollect:
appendStringInfoChar(str, '{');
_outMultiCollect(str, obj);
OutMultiCollect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiSelect:
appendStringInfoChar(str, '{');
_outMultiSelect(str, obj);
OutMultiSelect(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiTable:
appendStringInfoChar(str, '{');
_outMultiTable(str, obj);
OutMultiTable(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiJoin:
appendStringInfoChar(str, '{');
_outMultiJoin(str, obj);
OutMultiJoin(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPartition:
appendStringInfoChar(str, '{');
_outMultiPartition(str, obj);
OutMultiPartition(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiCartesianProduct:
appendStringInfoChar(str, '{');
_outMultiCartesianProduct(str, obj);
OutMultiCartesianProduct(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiExtendedOp:
appendStringInfoChar(str, '{');
_outMultiExtendedOp(str, obj);
OutMultiExtendedOp(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Job:
appendStringInfoChar(str, '{');
_outJob(str, obj);
OutJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MapMergeJob:
appendStringInfoChar(str, '{');
_outMapMergeJob(str, obj);
OutMapMergeJob(str, obj);
appendStringInfoChar(str, '}');
break;
case T_MultiPlan:
appendStringInfoChar(str, '{');
_outMultiPlan(str, obj);
OutMultiPlan(str, obj);
appendStringInfoChar(str, '}');
break;
case T_Task:
appendStringInfoChar(str, '{');
_outTask(str, obj);
OutTask(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardInterval:
appendStringInfoChar(str, '{');
_outShardInterval(str, obj);
OutShardInterval(str, obj);
appendStringInfoChar(str, '}');
break;
case T_ShardPlacement:
appendStringInfoChar(str, '{');
_outShardPlacement(str, obj);
OutShardPlacement(str, obj);
appendStringInfoChar(str, '}');
break;
@ -581,6 +615,7 @@ _outNode(StringInfo str, const void *obj)
}
}
#endif
/*
* CitusNodeToString -
@ -589,9 +624,13 @@ _outNode(StringInfo str, const void *obj)
char *
CitusNodeToString(const void *obj)
{
#if (PG_VERSION_NUM >= 90600)
return nodeToString(obj);
#else
StringInfoData str;
initStringInfo(&str);
_outNode(&str, obj);
outNode(&str, obj);
return str.data;
#endif
}

View File

@ -29,6 +29,35 @@
#include "nodes/value.h"
/*
* For 9.6 onwards, we use 9.6's extensible node system, thus there's no need
* to copy various routines anymore. In that case, replace these functions
* with plain wrappers.
*/
#if (PG_VERSION_NUM >= 90600)
void *
CitusStringToNode(char *str)
{
return stringToNode(str);
}
char *
citus_pg_strtok(int *length)
{
return pg_strtok(length);
}
void *
CitusNodeRead(char *token, int tok_len)
{
return nodeRead(token, tok_len);
}
#else
/* Static state for citus_pg_strtok */
static char *citus_pg_strtok_ptr = NULL;
@ -63,7 +92,7 @@ CitusStringToNode(char *str)
/*
* citus_pg_strtok is a copy of postgres' pg_strtok routine, referencing
* citus_pg_strtok_ptr instead of pg_strtok_ptr as state.
*/
*/
char *
citus_pg_strtok(int *length)
{
@ -346,3 +375,5 @@ CitusNodeRead(char *token, int tok_len)
return (void *) result;
}
#endif /* (PG_VERSION_NUM < 90600) */

View File

@ -0,0 +1,383 @@
/*-------------------------------------------------------------------------
*
* citus_readfuncs.c
* Citus specific node functions
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2012-2015, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "distributed/citus_nodefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/readfuncs.h"
/*
* Macros to simplify reading of different kinds of fields. Use these
* wherever possible to reduce the chance for silly typos. Note that these
* hard-wire conventions about the names of the local variables in a Read
* routine.
*/
/* Macros for declaring appropriate local variables */
/* A few guys need only local_node */
#if (PG_VERSION_NUM >= 90600)
static inline Node *
CitusSetTag(Node *node, int tag)
{
CitusNode *citus_node = (CitusNode *) node;
citus_node->citus_tag = tag;
return node;
}
/* *INDENT-OFF* */
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = (nodeTypeName *) CitusSetTag((Node *) node, T_##nodeTypeName)
#else
#define READ_LOCALS_NO_FIELDS(nodeTypeName) \
nodeTypeName *local_node = CitusMakeNode(nodeTypeName)
#endif
/* And a few guys need only the citus_pg_strtok support fields */
#define READ_TEMP_LOCALS() \
char *token; \
int length
/* ... but most need both */
#define READ_LOCALS(nodeTypeName) \
READ_LOCALS_NO_FIELDS(nodeTypeName); \
READ_TEMP_LOCALS()
/* Read an integer field (anything written as ":fldname %d") */
#define READ_INT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoi(token)
/* Read an unsigned integer field (anything written as ":fldname %u") */
#define READ_UINT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoui(token)
/* XXX: CITUS Read an uint64 field (anything written as ":fldname %u") */
#define READ_UINT64_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atoull(token)
/* Read an OID field (don't hard-wire assumption that OID is same as uint) */
#define READ_OID_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atooid(token)
/* Read a char field (ie, one ascii character) */
#define READ_CHAR_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = token[0]
/* Read an enumerated-type field that was written as an integer code */
#define READ_ENUM_FIELD(fldname, enumtype) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = (enumtype) atoi(token)
/* Read a float field */
#define READ_FLOAT_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = atof(token)
/* Read a boolean field */
#define READ_BOOL_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = strtobool(token)
/* Read a character-string field */
#define READ_STRING_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
local_node->fldname = nullable_string(token, length)
/* Read a parse location field (and throw away the value, per notes above) */
#define READ_LOCATION_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
token = citus_pg_strtok(&length); /* get field value */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = -1 /* set field to "unknown" */
/* Read a Node field XXX: Citus: replaced call to nodeRead with CitusNodeRead */
#define READ_NODE_FIELD(fldname) \
token = citus_pg_strtok(&length); /* skip :fldname */ \
(void) token; /* in case not used elsewhere */ \
local_node->fldname = CitusNodeRead(NULL, 0)
/* Routine exit */
#if (PG_VERSION_NUM >= 90600)
#define READ_DONE() \
return;
#else
#define READ_DONE() \
return (Node *) local_node
#endif
/*
* NOTE: use atoi() to read values written with %d, or atoui() to read
* values written with %u in outfuncs.c. An exception is OID values,
* for which use atooid(). (As of 7.1, outfuncs.c writes OIDs as %u,
* but this will probably change in the future.)
*/
#define atoui(x) ((unsigned int) strtoul((x), NULL, 10))
#define atooid(x) ((Oid) strtoul((x), NULL, 10))
/* XXX: Citus */
#define atoull(x) ((uint64) strtoull((x), NULL, 10))
#define strtobool(x) ((*(x) == 't') ? true : false)
#define nullable_string(token,length) \
((length) == 0 ? NULL : debackslash(token, length))
static void
readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
READFUNC_RET
ReadJob(READFUNC_ARGS)
{
READ_LOCALS_NO_FIELDS(Job);
readJobInfo(local_node);
READ_DONE();
}
READFUNC_RET
ReadMultiPlan(READFUNC_ARGS)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
READFUNC_RET
ReadShardInterval(READFUNC_ARGS)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
READFUNC_RET
ReadMapMergeJob(READFUNC_ARGS)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
/* can't use READ_NODE_FIELD, no field names */
local_node->sortedShardIntervalArray[i] = CitusNodeRead(NULL, 0);
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
READFUNC_RET
ReadShardPlacement(READFUNC_ARGS)
{
READ_LOCALS(ShardPlacement);
READ_OID_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
READFUNC_RET
ReadTask(READFUNC_ARGS)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
READFUNC_RET
ReadUnsupportedCitusNode(READFUNC_ARGS)
{
ereport(ERROR, (errmsg("not implemented")));
}
#if (PG_VERSION_NUM < 90600)
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
#endif
#if (PG_VERSION_NUM >= 90600)
/* *INDENT-ON* */
/*
* For 9.6+ we can just use the, now extensible, parseNodeString(). Before
* that citus_readfuncs_$ver.c has a version specific implementation.
*/
Node *
CitusParseNodeString(void)
{
return parseNodeString();
}
#endif

View File

@ -144,8 +144,6 @@
((length) == 0 ? NULL : debackslash(token, length))
static Datum readDatum(bool typbyval);
/*
* _readBitmapset
*/
@ -1368,216 +1366,6 @@ _readTableSampleClause(void)
}
/* XXX: BEGIN Citus Nodes */
static void
_readJobInfo(Job *local_node)
{
READ_TEMP_LOCALS();
READ_UINT64_FIELD(jobId);
READ_NODE_FIELD(jobQuery);
READ_NODE_FIELD(taskList);
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
}
static Job *
_readJob(void)
{
READ_LOCALS_NO_FIELDS(Job);
_readJobInfo(local_node);
READ_DONE();
}
static MultiPlan *
_readMultiPlan(void)
{
READ_LOCALS(MultiPlan);
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_DONE();
}
static ShardInterval *
_readShardInterval(void)
{
READ_LOCALS(ShardInterval);
READ_OID_FIELD(relationId);
READ_CHAR_FIELD(storageType);
READ_OID_FIELD(valueTypeId);
READ_INT_FIELD(valueTypeLen);
READ_BOOL_FIELD(valueByVal);
READ_BOOL_FIELD(minValueExists);
READ_BOOL_FIELD(maxValueExists);
token = citus_pg_strtok(&length); /* skip :minValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->minValue = readDatum(local_node->valueByVal);
token = citus_pg_strtok(&length); /* skip :maxValue */
if (!local_node->minValueExists)
token = citus_pg_strtok(&length); /* skip "<>" */
else
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_DONE();
}
static MapMergeJob *
_readMapMergeJob(void)
{
int arrayLength;
int i;
READ_LOCALS(MapMergeJob);
_readJobInfo(&local_node->job);
READ_NODE_FIELD(reduceQuery);
READ_ENUM_FIELD(partitionType, PartitionType);
READ_NODE_FIELD(partitionColumn);
READ_UINT_FIELD(partitionCount);
READ_INT_FIELD(sortedShardIntervalArrayLength);
arrayLength = local_node->sortedShardIntervalArrayLength;
/* now build & read sortedShardIntervalArray */
local_node->sortedShardIntervalArray =
(ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *));
for (i = 0; i < arrayLength; ++i)
{
local_node->sortedShardIntervalArray[i] = _readShardInterval();
}
READ_NODE_FIELD(mapTaskList);
READ_NODE_FIELD(mergeTaskList);
READ_DONE();
}
static ShardPlacement *
_readShardPlacement(void)
{
READ_LOCALS(ShardPlacement);
READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
READ_DONE();
}
static Task *
_readTask(void)
{
READ_LOCALS(Task);
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependedTaskList);
READ_UINT_FIELD(partitionId);
READ_UINT_FIELD(upstreamTaskId);
READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE();
}
/* XXX: END Citus Nodes */
/*
* readDatum
*
* Given a string representation of a constant, recreate the appropriate
* Datum. The string representation embeds length info, but not byValue,
* so we must be told that.
*/
static Datum
readDatum(bool typbyval)
{
Size length,
i;
int tokenLength;
char *token;
Datum res;
char *s;
/*
* read the actual length of the value
*/
token = citus_pg_strtok(&tokenLength);
length = atoui(token);
token = citus_pg_strtok(&tokenLength); /* read the '[' */
if (token == NULL || token[0] != '[')
elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
if (typbyval)
{
if (length > (Size) sizeof(Datum))
elog(ERROR, "byval datum but length = %zu", length);
res = (Datum) 0;
s = (char *) (&res);
for (i = 0; i < (Size) sizeof(Datum); i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
}
else if (length <= 0)
res = (Datum) NULL;
else
{
s = (char *) palloc(length);
for (i = 0; i < length; i++)
{
token = citus_pg_strtok(&tokenLength);
s[i] = (char) atoi(token);
}
res = PointerGetDatum(s);
}
token = citus_pg_strtok(&tokenLength); /* read the ']' */
if (token == NULL || token[0] != ']')
elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu",
token ? (const char *) token : "[NULL]", length);
return res;
}
/*
* parseNodeString
*
@ -1718,17 +1506,17 @@ CitusParseNodeString(void)
return_value = _readDeclareCursorStmt();
/* XXX: BEGIN Citus Nodes */
else if (MATCH("MULTIPLAN", 9))
return_value = _readMultiPlan();
return_value = ReadMultiPlan();
else if (MATCH("JOB", 3))
return_value = _readJob();
return_value = ReadJob();
else if (MATCH("SHARDINTERVAL", 13))
return_value = _readShardInterval();
return_value = ReadShardInterval();
else if (MATCH("MAPMERGEJOB", 11))
return_value = _readMapMergeJob();
return_value = ReadMapMergeJob();
else if (MATCH("SHARDPLACEMENT", 14))
return_value = _readShardPlacement();
return_value = ReadShardPlacement();
else if (MATCH("TASK", 4))
return_value = _readTask();
return_value = ReadTask();
/* XXX: END Citus Nodes */
else
{

File diff suppressed because it is too large Load Diff

View File

@ -37,6 +37,55 @@ extern void * CitusNodeRead(char *token, int tok_len);
/* citus_readfuncs.c */
extern Node * CitusParseNodeString(void);
extern Datum readDatum(bool typbyval);
extern void RegisterNodes(void);
/*
* Define read functions for citus nodes in a way they're usable across
* several major versions. That requires some macro-uglyness as 9.6+ is quite
* different from before.
*/
#if (PG_VERSION_NUM >= 90600)
#define READFUNC_ARGS struct ExtensibleNode *node
#define READFUNC_RET void
#else
#define READFUNC_ARGS void
#define READFUNC_RET Node *
#endif
#if (PG_VERSION_NUM >= 90600)
#define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node
#else
#define OUTFUNC_ARGS StringInfo str, const Node *raw_node
#endif
extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS);
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
extern void OutJob(OUTFUNC_ARGS);
extern void OutMultiPlan(OUTFUNC_ARGS);
extern void OutShardInterval(OUTFUNC_ARGS);
extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutMultiNode(OUTFUNC_ARGS);
extern void OutMultiTreeRoot(OUTFUNC_ARGS);
extern void OutMultiProject(OUTFUNC_ARGS);
extern void OutMultiCollect(OUTFUNC_ARGS);
extern void OutMultiSelect(OUTFUNC_ARGS);
extern void OutMultiTable(OUTFUNC_ARGS);
extern void OutMultiJoin(OUTFUNC_ARGS);
extern void OutMultiPartition(OUTFUNC_ARGS);
extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS);
#endif /* CITUS_NODEFUNCS_H */

View File

@ -3,6 +3,26 @@
* citus_nodes.h
* Additional node types, and related infrastructure, for Citus.
*
* To add a new node type to Citus, perform the following:
*
* * Add a new CitusNodeTag value to use as a tag for the node. Add
* the node's name at a corresponding offset within the array named
* CitusNodeTagNamesD at the top of citus_nodefuncs.c
*
* * Describe the node in a struct, which must have a CitusNode as
* its first element
*
* * Implement an 'outfunc' for the node in citus_outfuncs.c, using
* the macros defined within that file. This function will handle
* converting the node to a string
*
* * Implement a 'readfunc' for the node in citus_readfuncs.c, using
* the macros defined within that file. This function will handle
* converting strings into instances of the node
*
* * Use DEFINE_NODE_METHODS within the nodeMethods array (near the
* bottom of citus_nodefuncs.c) to register the node in PostgreSQL
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
@ -17,9 +37,10 @@
*
* These have to be distinct from the ideas used in postgres' nodes.h
*/
#define CITUS_NODE_TAG_START 1200
typedef enum CitusNodeTag
{
T_MultiNode = 1200, /* FIXME: perhaps use something less predicable? */
T_MultiNode = CITUS_NODE_TAG_START, /* FIXME: perhaps use something less predicable? */
T_MultiTreeRoot,
T_MultiProject,
T_MultiCollect,
@ -38,6 +59,46 @@ typedef enum CitusNodeTag
} CitusNodeTag;
const char** CitusNodeTagNames;
#if (PG_VERSION_NUM >= 90600)
#include "nodes/extensible.h"
typedef struct CitusNode
{
ExtensibleNode extensible;
CitusNodeTag citus_tag; /* for quick type determination */
} CitusNode;
#define CitusNodeTag(nodeptr) CitusNodeTagI((Node*) nodeptr)
static inline int
CitusNodeTagI(Node *node)
{
if (!IsA(node, ExtensibleNode))
{
return nodeTag(node);
}
return ((CitusNode*)(node))->citus_tag;
}
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ CitusNode *_result; \
AssertMacro((size) >= sizeof(CitusNode)); /* need the tag, at least */ \
_result = (CitusNode *) palloc0fast(size); \
_result->extensible.type = T_ExtensibleNode; \
_result->extensible.extnodename = CitusNodeTagNames[tag - CITUS_NODE_TAG_START]; \
_result->citus_tag =(int) (tag); \
_result; \
})
#else
typedef CitusNodeTag CitusNode;
/*
* nodeTag equivalent that returns the node tag for both citus and postgres
* node tag types. Needs to return int as there's no type that covers both
@ -46,12 +107,6 @@ typedef enum CitusNodeTag
#define CitusNodeTag(nodeptr) (*((const int*)(nodeptr)))
/*
* IsA equivalent that compares node tags as integers, not as enum values.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/* Citus variant of newNode(), don't use directly. */
#define CitusNewNode(size, tag) \
({ Node *_result; \
@ -61,6 +116,13 @@ typedef enum CitusNodeTag
_result; \
})
#endif
/*
* IsA equivalent that compares node tags, including Citus-specific nodes.
*/
#define CitusIsA(nodeptr,_type_) (CitusNodeTag(nodeptr) == T_##_type_)
/*
* CitusMakeNode is Citus variant of makeNode(). Use it to create nodes of

View File

@ -29,7 +29,7 @@
/* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval
{
CitusNodeTag type;
CitusNode type;
Oid relationId;
char storageType;
Oid valueTypeId; /* min/max value datum's typeId */
@ -46,7 +46,7 @@ typedef struct ShardInterval
/* In-memory representation of a tuple in pg_dist_shard_placement. */
typedef struct ShardPlacement
{
CitusNodeTag type;
CitusNode type;
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;

View File

@ -35,7 +35,7 @@
*/
typedef struct MultiNode
{
CitusNodeTag type;
CitusNode type;
struct MultiNode *parentNode;

View File

@ -114,7 +114,7 @@ typedef enum
*/
typedef struct Job
{
CitusNodeTag type;
CitusNode type;
uint64 jobId;
Query *jobQuery;
List *taskList;
@ -152,7 +152,7 @@ typedef struct TaskExecution TaskExecution;
typedef struct Task
{
CitusNodeTag type;
CitusNode type;
TaskType taskType;
uint64 jobId;
uint32 taskId;
@ -201,7 +201,7 @@ typedef struct JoinSequenceNode
*/
typedef struct MultiPlan
{
CitusNodeTag type;
CitusNode type;
Job *workerJob;
Query *masterQuery;
char *masterTableName;