From d5243184672049ed6c8e014c2711d681189f3472 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Fri, 30 Sep 2016 16:03:13 -0600 Subject: [PATCH] Apply Citus changes to 9.6 readfuncs Keeping this in a separate commit for easier reference. --- .../distributed/utils/citus_readfuncs_96.c | 353 +++++++++++++----- 1 file changed, 258 insertions(+), 95 deletions(-) diff --git a/src/backend/distributed/utils/citus_readfuncs_96.c b/src/backend/distributed/utils/citus_readfuncs_96.c index da71dbb73..4c9946594 100644 --- a/src/backend/distributed/utils/citus_readfuncs_96.c +++ b/src/backend/distributed/utils/citus_readfuncs_96.c @@ -1,35 +1,26 @@ /*------------------------------------------------------------------------- * - * readfuncs.c - * Reader functions for Postgres tree nodes. + * citus_readfuncs.c + * Citus adapted reader functions for Citus & Postgres tree nodes * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * src/backend/nodes/readfuncs.c - * - * NOTES - * Path nodes do not have any readfuncs support, because we never - * have occasion to read them in. (There was once code here that - * claimed to read them, but it was broken as well as unused.) We - * never read executor state trees, either. - * - * Parse location fields are written out by outfuncs.c, but only for - * possible debugging use. When reading a location field, we discard - * the stored value and set the location field to -1 (ie, "unknown"). - * This is because nodes coming from a stored rule should not be thought - * to have a known location in the current query's text. + * Portions Copyright (c) 2012-2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90700) + #include #include "fmgr.h" #include "nodes/extensible.h" +#include "distributed/citus_nodefuncs.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/multi_physical_planner.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "nodes/readfuncs.h" @@ -46,9 +37,9 @@ /* A few guys need only local_node */ #define READ_LOCALS_NO_FIELDS(nodeTypeName) \ - nodeTypeName *local_node = makeNode(nodeTypeName) + nodeTypeName *local_node = CitusMakeNode(nodeTypeName) -/* And a few guys need only the pg_strtok support fields */ +/* And a few guys need only the citus_pg_strtok support fields */ #define READ_TEMP_LOCALS() \ char *token; \ int length @@ -60,96 +51,103 @@ /* Read an integer field (anything written as ":fldname %d") */ #define READ_INT_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = atoi(token) /* Read an unsigned integer field (anything written as ":fldname %u") */ #define READ_UINT_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = atoui(token) +/* XXX: CITUS Read an uint64 field (anything written as ":fldname %u") */ +#define READ_UINT64_FIELD(fldname) \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ + local_node->fldname = atoull(token) + /* Read an long integer field (anything written as ":fldname %ld") */ #define READ_LONG_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = atol(token) /* Read an OID field (don't hard-wire assumption that OID is same as uint) */ #define READ_OID_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = atooid(token) /* Read a char field (ie, one ascii character) */ #define READ_CHAR_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = token[0] /* Read an enumerated-type field that was written as an integer code */ #define READ_ENUM_FIELD(fldname, enumtype) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = (enumtype) atoi(token) /* Read a float field */ #define READ_FLOAT_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = atof(token) /* Read a boolean field */ #define READ_BOOL_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = strtobool(token) /* Read a character-string field */ #define READ_STRING_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ local_node->fldname = nullable_string(token, length) /* Read a parse location field (and throw away the value, per notes above) */ #define READ_LOCATION_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ - token = pg_strtok(&length); /* get field value */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* get field value */ \ (void) token; /* in case not used elsewhere */ \ local_node->fldname = -1 /* set field to "unknown" */ +/* Read a Node field XXX: Citus: replaced call to nodeRead with CitusNodeRead */ /* Read a Node field */ #define READ_NODE_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ (void) token; /* in case not used elsewhere */ \ - local_node->fldname = nodeRead(NULL, 0) + local_node->fldname = CitusNodeRead(NULL, 0) /* Read a bitmapset field */ #define READ_BITMAPSET_FIELD(fldname) \ - token = pg_strtok(&length); /* skip :fldname */ \ + token = citus_pg_strtok(&length); /* skip :fldname */ \ (void) token; /* in case not used elsewhere */ \ local_node->fldname = _readBitmapset() /* Read an attribute number array */ #define READ_ATTRNUMBER_ARRAY(fldname, len) \ - token = pg_strtok(&length); /* skip :fldname */ \ - local_node->fldname = readAttrNumberCols(len); + token = citus_pg_strtok(&length); /* skip :fldname */ \ + local_node->fldname = _readAttrNumberCols(len); /* Read an oid array */ #define READ_OID_ARRAY(fldname, len) \ - token = pg_strtok(&length); /* skip :fldname */ \ - local_node->fldname = readOidCols(len); + token = citus_pg_strtok(&length); /* skip :fldname */ \ + local_node->fldname = _readOidCols(len); /* Read an int array */ #define READ_INT_ARRAY(fldname, len) \ - token = pg_strtok(&length); /* skip :fldname */ \ - local_node->fldname = readIntCols(len); + token = citus_pg_strtok(&length); /* skip :fldname */ \ + local_node->fldname = _readIntCols(len); /* Read a bool array */ #define READ_BOOL_ARRAY(fldname, len) \ - token = pg_strtok(&length); /* skip :fldname */ \ - local_node->fldname = readBoolCols(len); + token = citus_pg_strtok(&length); /* skip :fldname */ \ + local_node->fldname = _readBoolCols(len); /* Routine exit */ #define READ_DONE() \ @@ -166,12 +164,22 @@ #define atooid(x) ((Oid) strtoul((x), NULL, 10)) +/* XXX: Citus */ +#define atoull(x) ((uint64) strtoull((x), NULL, 10)) + #define strtobool(x) ((*(x) == 't') ? true : false) #define nullable_string(token,length) \ ((length) == 0 ? NULL : debackslash(token, length)) +static Datum CitusReadDatum(bool typbyval); +static AttrNumber * _readAttrNumberCols(int numCols); +static Oid * _readOidCols(int numCols); +static int * _readIntCols(int numCols); +static bool * _readBoolCols(int numCols); + + /* * _readBitmapset */ @@ -182,13 +190,13 @@ _readBitmapset(void) READ_TEMP_LOCALS(); - token = pg_strtok(&length); + token = citus_pg_strtok(&length); if (token == NULL) elog(ERROR, "incomplete Bitmapset structure"); if (length != 1 || token[0] != '(') elog(ERROR, "unrecognized token: \"%.*s\"", length, token); - token = pg_strtok(&length); + token = citus_pg_strtok(&length); if (token == NULL) elog(ERROR, "incomplete Bitmapset structure"); if (length != 1 || token[0] != 'b') @@ -199,7 +207,7 @@ _readBitmapset(void) int val; char *endptr; - token = pg_strtok(&length); + token = citus_pg_strtok(&length); if (token == NULL) elog(ERROR, "unterminated Bitmapset structure"); if (length == 1 && token[0] == ')') @@ -213,15 +221,6 @@ _readBitmapset(void) return result; } -/* - * for use by extensions which define extensible nodes - */ -Bitmapset * -readBitmapset(void) -{ - return _readBitmapset(); -} - /* * _readQuery */ @@ -509,11 +508,11 @@ _readConst(void) READ_BOOL_FIELD(constisnull); READ_LOCATION_FIELD(location); - token = pg_strtok(&length); /* skip :constvalue */ + token = citus_pg_strtok(&length); /* skip :constvalue */ if (local_node->constisnull) - token = pg_strtok(&length); /* skip "<>" */ + token = citus_pg_strtok(&length); /* skip "<>" */ else - local_node->constvalue = readDatum(local_node->constbyval); + local_node->constvalue = CitusReadDatum(local_node->constbyval); READ_DONE(); } @@ -748,8 +747,8 @@ _readBoolExpr(void) READ_LOCALS(BoolExpr); /* do-it-yourself enum representation */ - token = pg_strtok(&length); /* skip :boolop */ - token = pg_strtok(&length); /* get field value */ + token = citus_pg_strtok(&length); /* skip :boolop */ + token = citus_pg_strtok(&length); /* get field value */ if (strncmp(token, "and", 3) == 0) local_node->boolop = AND_EXPR; else if (strncmp(token, "or", 2) == 0) @@ -1835,8 +1834,8 @@ _readCustomScan(void) READ_BITMAPSET_FIELD(custom_relids); /* Lookup CustomScanMethods by CustomName */ - token = pg_strtok(&length); /* skip methods: */ - token = pg_strtok(&length); /* CustomName */ + token = citus_pg_strtok(&length); /* skip methods: */ + token = citus_pg_strtok(&length); /* CustomName */ custom_name = nullable_string(token, length); methods = GetCustomScanMethods(custom_name, false); local_node->methods = methods; @@ -2229,8 +2228,8 @@ _readExtensibleNode(void) READ_TEMP_LOCALS(); - token = pg_strtok(&length); /* skip :extnodename */ - token = pg_strtok(&length); /* get extnodename */ + token = citus_pg_strtok(&length); /* skip :extnodename */ + token = citus_pg_strtok(&length); /* get extnodename */ extnodename = nullable_string(token, length); if (!extnodename) @@ -2247,22 +2246,170 @@ _readExtensibleNode(void) READ_DONE(); } +/* XXX: BEGIN Citus Nodes */ + +static void +_readJobInfo(Job *local_node) +{ + READ_TEMP_LOCALS(); + + READ_UINT64_FIELD(jobId); + READ_NODE_FIELD(jobQuery); + READ_NODE_FIELD(taskList); + READ_NODE_FIELD(dependedJobList); + READ_BOOL_FIELD(subqueryPushdown); +} + + +static Job * +_readJob(void) +{ + READ_LOCALS_NO_FIELDS(Job); + + _readJobInfo(local_node); + + READ_DONE(); +} + + +static MultiPlan * +_readMultiPlan(void) +{ + READ_LOCALS(MultiPlan); + + READ_NODE_FIELD(workerJob); + READ_NODE_FIELD(masterQuery); + READ_STRING_FIELD(masterTableName); + + READ_DONE(); +} + + +static ShardInterval * +_readShardInterval(void) +{ + READ_LOCALS(ShardInterval); + + + READ_OID_FIELD(relationId); + READ_CHAR_FIELD(storageType); + READ_OID_FIELD(valueTypeId); + READ_INT_FIELD(valueTypeLen); + READ_BOOL_FIELD(valueByVal); + READ_BOOL_FIELD(minValueExists); + READ_BOOL_FIELD(maxValueExists); + + token = citus_pg_strtok(&length); /* skip :minValue */ + if (!local_node->minValueExists) + token = citus_pg_strtok(&length); /* skip "<>" */ + else + local_node->minValue = CitusReadDatum(local_node->valueByVal); + + token = citus_pg_strtok(&length); /* skip :maxValue */ + if (!local_node->minValueExists) + token = citus_pg_strtok(&length); /* skip "<>" */ + else + local_node->maxValue = CitusReadDatum(local_node->valueByVal); + + READ_UINT64_FIELD(shardId); + + READ_DONE(); +} + + +static MapMergeJob * +_readMapMergeJob(void) +{ + int arrayLength; + int i; + + READ_LOCALS(MapMergeJob); + + _readJobInfo(&local_node->job); + + READ_NODE_FIELD(reduceQuery); + READ_ENUM_FIELD(partitionType, PartitionType); + READ_NODE_FIELD(partitionColumn); + READ_UINT_FIELD(partitionCount); + READ_INT_FIELD(sortedShardIntervalArrayLength); + + arrayLength = local_node->sortedShardIntervalArrayLength; + + /* now build & read sortedShardIntervalArray */ + local_node->sortedShardIntervalArray = + (ShardInterval**) palloc(arrayLength * sizeof(ShardInterval *)); + + for (i = 0; i < arrayLength; ++i) + { + local_node->sortedShardIntervalArray[i] = _readShardInterval(); + } + + READ_NODE_FIELD(mapTaskList); + READ_NODE_FIELD(mergeTaskList); + + READ_DONE(); +} + + +static ShardPlacement * +_readShardPlacement(void) +{ + READ_LOCALS(ShardPlacement); + + READ_OID_FIELD(placementId); + READ_UINT64_FIELD(shardId); + READ_UINT64_FIELD(shardLength); + READ_ENUM_FIELD(shardState, RelayFileState); + READ_STRING_FIELD(nodeName); + READ_UINT_FIELD(nodePort); + + READ_DONE(); +} + + +static Task * +_readTask(void) +{ + READ_LOCALS(Task); + + READ_ENUM_FIELD(taskType, TaskType); + READ_UINT64_FIELD(jobId); + READ_UINT_FIELD(taskId); + READ_STRING_FIELD(queryString); + READ_UINT64_FIELD(anchorShardId); + READ_NODE_FIELD(taskPlacementList); + READ_NODE_FIELD(dependedTaskList); + READ_UINT_FIELD(partitionId); + READ_UINT_FIELD(upstreamTaskId); + READ_NODE_FIELD(shardInterval); + READ_BOOL_FIELD(assignmentConstrained); + READ_NODE_FIELD(taskExecution); + READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); + + READ_DONE(); +} + + +/* XXX: END Citus Nodes */ + + /* * parseNodeString * * Given a character string representing a node tree, parseNodeString creates * the internal node structure. * - * The string to be read must already have been loaded into pg_strtok(). + * The string to be read must already have been loaded into citus_pg_strtok(). */ Node * -parseNodeString(void) +CitusParseNodeString(void) { void *return_value; READ_TEMP_LOCALS(); - token = pg_strtok(&length); + token = citus_pg_strtok(&length); #define MATCH(tokname, namelen) \ (length == namelen && memcmp(token, tokname, namelen) == 0) @@ -2477,7 +2624,21 @@ parseNodeString(void) return_value = _readAlternativeSubPlan(); else if (MATCH("EXTENSIBLENODE", 14)) return_value = _readExtensibleNode(); - else + /* XXX: BEGIN Citus Nodes */ + else if (MATCH("MULTIPLAN", 9)) + return_value = _readMultiPlan(); + else if (MATCH("JOB", 3)) + return_value = _readJob(); + else if (MATCH("SHARDINTERVAL", 13)) + return_value = _readShardInterval(); + else if (MATCH("MAPMERGEJOB", 11)) + return_value = _readMapMergeJob(); + else if (MATCH("SHARDPLACEMENT", 14)) + return_value = _readShardPlacement(); + else if (MATCH("TASK", 4)) + return_value = _readTask(); + /* XXX: END Citus Nodes */ + else { elog(ERROR, "badly formatted node string \"%.32s\"...", token); return_value = NULL; /* keep compiler quiet */ @@ -2494,8 +2655,8 @@ parseNodeString(void) * Datum. The string representation embeds length info, but not byValue, * so we must be told that. */ -Datum -readDatum(bool typbyval) +static Datum +CitusReadDatum(bool typbyval) { Size length, i; @@ -2507,10 +2668,10 @@ readDatum(bool typbyval) /* * read the actual length of the value */ - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); length = atoui(token); - token = pg_strtok(&tokenLength); /* read the '[' */ + token = citus_pg_strtok(&tokenLength); /* read the '[' */ if (token == NULL || token[0] != '[') elog(ERROR, "expected \"[\" to start datum, but got \"%s\"; length = %zu", token ? (const char *) token : "[NULL]", length); @@ -2523,7 +2684,7 @@ readDatum(bool typbyval) s = (char *) (&res); for (i = 0; i < (Size) sizeof(Datum); i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); s[i] = (char) atoi(token); } } @@ -2534,13 +2695,13 @@ readDatum(bool typbyval) s = (char *) palloc(length); for (i = 0; i < length; i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); s[i] = (char) atoi(token); } res = PointerGetDatum(s); } - token = pg_strtok(&tokenLength); /* read the ']' */ + token = citus_pg_strtok(&tokenLength); /* read the ']' */ if (token == NULL || token[0] != ']') elog(ERROR, "expected \"]\" to end datum, but got \"%s\"; length = %zu", token ? (const char *) token : "[NULL]", length); @@ -2551,8 +2712,8 @@ readDatum(bool typbyval) /* * readAttrNumberCols */ -AttrNumber * -readAttrNumberCols(int numCols) +static AttrNumber * +_readAttrNumberCols(int numCols) { int tokenLength, i; @@ -2565,7 +2726,7 @@ readAttrNumberCols(int numCols) attr_vals = (AttrNumber *) palloc(numCols * sizeof(AttrNumber)); for (i = 0; i < numCols; i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); attr_vals[i] = atoi(token); } @@ -2575,8 +2736,8 @@ readAttrNumberCols(int numCols) /* * readOidCols */ -Oid * -readOidCols(int numCols) +static Oid * +_readOidCols(int numCols) { int tokenLength, i; @@ -2589,7 +2750,7 @@ readOidCols(int numCols) oid_vals = (Oid *) palloc(numCols * sizeof(Oid)); for (i = 0; i < numCols; i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); oid_vals[i] = atooid(token); } @@ -2599,8 +2760,8 @@ readOidCols(int numCols) /* * readIntCols */ -int * -readIntCols(int numCols) +static int * +_readIntCols(int numCols) { int tokenLength, i; @@ -2613,7 +2774,7 @@ readIntCols(int numCols) int_vals = (int *) palloc(numCols * sizeof(int)); for (i = 0; i < numCols; i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); int_vals[i] = atoi(token); } @@ -2623,8 +2784,8 @@ readIntCols(int numCols) /* * readBoolCols */ -bool * -readBoolCols(int numCols) +static bool * +_readBoolCols(int numCols) { int tokenLength, i; @@ -2637,9 +2798,11 @@ readBoolCols(int numCols) bool_vals = (bool *) palloc(numCols * sizeof(bool)); for (i = 0; i < numCols; i++) { - token = pg_strtok(&tokenLength); + token = citus_pg_strtok(&tokenLength); bool_vals[i] = strtobool(token); } return bool_vals; } + +#endif