From 9a82e8f06bcfddf0d98b9ecebf1f130ce95c3735 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 20 Jan 2017 15:16:35 -0800 Subject: [PATCH 1/5] Make usage of static a bit more consistent in multi_planner.c. --- src/backend/distributed/planner/multi_planner.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index c21665d9c..5a1afbe40 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -405,7 +405,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index * CreateAndPushRestrictionContext creates a new restriction context, inserts it to the * beginning of the context list, and returns the newly created context. */ -RelationRestrictionContext * +static RelationRestrictionContext * CreateAndPushRestrictionContext(void) { RelationRestrictionContext *restrictionContext = @@ -425,7 +425,7 @@ CreateAndPushRestrictionContext(void) * CurrentRestrictionContext returns the the last restriction context from the * list. */ -RelationRestrictionContext * +static RelationRestrictionContext * CurrentRestrictionContext(void) { RelationRestrictionContext *restrictionContext = NULL; @@ -443,7 +443,7 @@ CurrentRestrictionContext(void) * PopRestrictionContext removes the most recently added restriction context from * context list. The function assumes the list is not empty. */ -void +static void PopRestrictionContext(void) { relationRestrictionContextList = list_delete_first(relationRestrictionContextList); From 557ccc6fdaa0b202f69eeba4407b9373b00886e0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 19 Jan 2017 16:45:37 -0800 Subject: [PATCH 2/5] Support for deferred error messages. It can be useful, e.g. in the upcoming prepared statement support, to be able to return an error from a function that is not raised immediately, but can later be thrown. That allows e.g. to attempt to plan a statment using different methods and to create good error messages in each planner, but to only error out after all planners have been run. To enable that create support for deferred error messages that can be created (supporting errorcode, message, detail, hint) in one function, and then thrown in different place. --- .../distributed/utils/citus_nodefuncs.c | 5 +- .../distributed/utils/citus_outfuncs.c | 24 ++++++++ .../distributed/utils/citus_readfuncs.c | 19 ++++++ .../distributed/utils/citus_readfuncs_95.c | 2 + src/backend/distributed/utils/errormessage.c | 58 ++++++++++++++++++ src/include/distributed/citus_nodefuncs.h | 2 + src/include/distributed/citus_nodes.h | 5 +- src/include/distributed/errormessage.h | 60 +++++++++++++++++++ 8 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/utils/errormessage.c create mode 100644 src/include/distributed/errormessage.h diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 03dda8a7c..42c7a4ece 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -13,6 +13,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodes.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/errormessage.h" #include "distributed/metadata_cache.h" #include "distributed/multi_planner.h" @@ -33,7 +34,8 @@ static const char *CitusNodeTagNamesD[] = { "Task", "ShardInterval", "ShardPlacement", - "RelationShard" + "RelationShard", + "DeferredErrorMessage" }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -383,6 +385,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(ShardPlacement), DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(Task), + DEFINE_NODE_METHODS(DeferredErrorMessage), /* nodes with only output support */ DEFINE_NODE_METHODS_NO_READ(MultiNode), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index ecb4150ba..28b943848 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -22,6 +22,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/errormessage.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" @@ -515,6 +516,23 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(relationShardList); } + +void +OutDeferredErrorMessage(OUTFUNC_ARGS) +{ + WRITE_LOCALS(DeferredErrorMessage); + WRITE_NODE_TYPE("DEFERREDERRORMESSAGE"); + + WRITE_INT_FIELD(code); + WRITE_STRING_FIELD(message); + WRITE_STRING_FIELD(detail); + WRITE_STRING_FIELD(hint); + WRITE_STRING_FIELD(filename); + WRITE_INT_FIELD(linenumber); + WRITE_STRING_FIELD(functionname); +} + + #if (PG_VERSION_NUM < 90600) /* @@ -635,6 +653,12 @@ outNode(StringInfo str, const void *obj) appendStringInfoChar(str, '}'); break; + case T_DeferredErrorMessage: + appendStringInfoChar(str, '{'); + OutDeferredErrorMessage(str, obj); + appendStringInfoChar(str, '}'); + break; + default: /* fall back into postgres' normal nodeToString machinery */ appendStringInfoString(str, nodeToString(obj)); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index c430df0cb..149ae0492 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -14,6 +14,7 @@ #include #include "distributed/citus_nodefuncs.h" +#include "distributed/errormessage.h" #include "distributed/multi_planner.h" #include "nodes/parsenodes.h" #include "nodes/readfuncs.h" @@ -314,6 +315,24 @@ ReadTask(READFUNC_ARGS) READ_DONE(); } + +READFUNC_RET +ReadDeferredErrorMessage(READFUNC_ARGS) +{ + READ_LOCALS(DeferredErrorMessage); + + READ_INT_FIELD(code); + READ_STRING_FIELD(message); + READ_STRING_FIELD(detail); + READ_STRING_FIELD(hint); + READ_STRING_FIELD(filename); + READ_INT_FIELD(linenumber); + READ_STRING_FIELD(functionname); + + READ_DONE(); +} + + READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index b5ac97576..b9719aeff 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1519,6 +1519,8 @@ CitusParseNodeString(void) return_value = ReadRelationShard(); else if (MATCH("TASK", 4)) return_value = ReadTask(); + else if (MATCH("DEFERREDERRORMESSAGE", 20)) + return_value = ReadDeferredErrorMessage(); /* XXX: END Citus Nodes */ else { diff --git a/src/backend/distributed/utils/errormessage.c b/src/backend/distributed/utils/errormessage.c new file mode 100644 index 000000000..2482768e7 --- /dev/null +++ b/src/backend/distributed/utils/errormessage.c @@ -0,0 +1,58 @@ +/* + * errormessage.c + * Error handling related support functionality. + * + * Copyright (c) 2017, Citus Data, Inc. + */ + +#include "postgres.h" + +#include "distributed/citus_nodes.h" +#include "distributed/errormessage.h" + + +/* + * DeferredErrorInternal is a helper function for DeferredError(). + */ +DeferredErrorMessage * +DeferredErrorInternal(int code, const char *message, const char *detail, const char *hint, + const char *filename, int linenumber, const char *functionname) +{ + DeferredErrorMessage *error = CitusMakeNode(DeferredErrorMessage); + + error->code = code; + error->message = message; + error->detail = detail; + error->hint = hint; + error->filename = filename; + error->linenumber = linenumber; + error->functionname = functionname; + return error; +} + + +/* + * RaiseDeferredErrorInternal is a helper function for RaiseDeferredError(). + */ +void +RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel) +{ + ErrorData *errorData = palloc0(sizeof(ErrorData)); + + errorData->sqlerrcode = error->code; + errorData->elevel = elevel; + errorData->message = pstrdup(error->message); + if (error->detail) + { + errorData->detail = pstrdup(error->detail); + } + if (error->hint) + { + errorData->hint = pstrdup(error->hint); + } + errorData->filename = pstrdup(error->filename); + errorData->lineno = error->linenumber; + errorData->funcname = error->functionname; + + ThrowErrorData(errorData); +} diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 8d5c782d4..884e61c6a 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -68,6 +68,7 @@ extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); +extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS); extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); @@ -78,6 +79,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); +extern void OutDeferredErrorMessage(OUTFUNC_ARGS); extern void OutMultiNode(OUTFUNC_ARGS); extern void OutMultiTreeRoot(OUTFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 05e7e0bd1..3a401c381 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -56,7 +56,8 @@ typedef enum CitusNodeTag T_Task, T_ShardInterval, T_ShardPlacement, - T_RelationShard + T_RelationShard, + T_DeferredErrorMessage } CitusNodeTag; @@ -99,6 +100,8 @@ CitusNodeTagI(Node *node) #else +#include "nodes/nodes.h" + typedef CitusNodeTag CitusNode; /* * nodeTag equivalent that returns the node tag for both citus and postgres diff --git a/src/include/distributed/errormessage.h b/src/include/distributed/errormessage.h new file mode 100644 index 000000000..26f8dd63a --- /dev/null +++ b/src/include/distributed/errormessage.h @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * + * errormessage.h + * Error handling related support functionality. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef ERRORMESSAGE_H +#define ERRORMESSAGE_H + + +#include "distributed/citus_nodes.h" + + +typedef struct DeferredErrorMessage +{ + CitusNode tag; + + int code; + const char *message; + const char *detail; + const char *hint; + const char *filename; + int linenumber; + const char *functionname; +} DeferredErrorMessage; + + +/* + * DeferredError allocates a deferred error message, that can later be emitted + * using RaiseDeferredError(). These error messages can be + * serialized/copied/deserialized, i.e. can be embedded in plans and such. + */ +#define DeferredError(code, message, detail, hint) \ + DeferredErrorInternal(code, message, detail, hint, __FILE__, __LINE__, __func__) + +DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, const + char *detail, const char *hint, + const char *filename, int linenumber, const + char *functionname); + +/* + * RaiseDeferredError emits a previously allocated error using the specified + * severity. + * + * The trickery with __builtin_constant_p/pg_unreachable aims to have the + * compiler understand that the function will not return if elevel >= ERROR. + */ +#define RaiseDeferredError(error, elevel) \ + do { \ + RaiseDeferredErrorInternal(error, elevel); \ + if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ + pg_unreachable(); } \ + } while (0) + +void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); + +#endif From 7681f6ab9d322fc309fc056046e6a27641f84f87 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 19 Jan 2017 17:21:31 -0800 Subject: [PATCH 3/5] Centralize more of distributed planning into CreateDistributedPlan(). The name CreatePhysicalPlan() hasn't been accurate for a while, and the split of work between multi_planner() and CreatePhysicalPlan() doesn't seem perfect. So rename to CreateDistributedPlan() and move a bit more logic in there. --- .../distributed/planner/multi_planner.c | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 5a1afbe40..df702cd0e 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -39,9 +39,11 @@ static void CheckNodeIsDumpable(Node *node); static char * GetMultiPlanString(PlannedStmt *result); static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, struct MultiPlan *multiPlan); -static struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query, - RelationRestrictionContext * - restrictionContext); +static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, + Query *originalQuery, + Query *query, + RelationRestrictionContext * + restrictionContext); static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void); static void PopRestrictionContext(void); @@ -100,11 +102,8 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (needsDistributedPlanning) { - MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse, - restrictionContext); - - /* store required data into the planned statement */ - result = MultiQueryContainerNode(result, physicalPlan); + result = CreateDistributedPlan(result, originalQuery, parse, + restrictionContext); } } PG_CATCH(); @@ -122,15 +121,15 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* - * CreatePhysicalPlan encapsulates the logic needed to transform a particular - * query into a physical plan. For modifications, queries immediately enter + * CreateDistributedPlan encapsulates the logic needed to transform a particular + * query into a distributed plan. For modifications, queries immediately enter * the physical planning stage, since they are essentially "routed" to remote * target shards. SELECT queries go through the full logical plan/optimize/ * physical plan process needed to produce distributed query plans. */ -static MultiPlan * -CreatePhysicalPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext) +static PlannedStmt * +CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) { MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query, restrictionContext); @@ -153,7 +152,8 @@ CreatePhysicalPlan(Query *originalQuery, Query *query, physicalPlan = MultiPhysicalPlanCreate(logicalPlan); } - return physicalPlan; + /* store required data into the planned statement */ + return MultiQueryContainerNode(localPlan, physicalPlan); } From c244b8ef4a164536458091257dbb799d930f7b5a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 19 Jan 2017 17:05:49 -0800 Subject: [PATCH 4/5] Make router planner error handling more flexible. So far router planner had encapsulated different functionality in MultiRouterPlanCreate. Modifications always go through router, selects sometimes. Modifications always error out if the query is unsupported, selects return NULL. Especially the error handling is a problem for the upcoming extension of prepared statement support. Split MultiRouterPlanCreate into CreateRouterPlan and CreateModifyPlan, and change them to not throw errors. Instead errors are now reported by setting the new MultiPlan->plannigError. Callers of router planner functionality now have to throw errors themselves if desired, but also can skip doing so. This is a pre-requisite for expanding prepared statement support. While touching all those lines, improve a number of error messages by getting them closer to the postgres error message guidelines. --- .../master/master_modify_multiple_shards.c | 6 +- .../distributed/planner/multi_planner.c | 94 +++- .../planner/multi_router_planner.c | 479 ++++++++++-------- .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + .../distributed/multi_physical_planner.h | 8 + src/include/distributed/multi_planner.h | 1 + .../distributed/multi_router_planner.h | 10 +- .../regress/expected/multi_insert_select.out | 68 ++- .../regress/expected/multi_modifications.out | 9 +- .../expected/multi_reference_table.out | 8 +- .../regress/expected/multi_router_planner.out | 3 +- .../regress/expected/multi_shard_modify.out | 6 +- src/test/regress/expected/multi_upsert.out | 3 +- 14 files changed, 399 insertions(+), 298 deletions(-) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 1a964208a..b65710968 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -139,7 +139,11 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) if (modifyQuery->commandType != CMD_UTILITY) { - ErrorIfModifyQueryNotSupported(modifyQuery); + DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery); + if (error) + { + RaiseDeferredError(error, ERROR); + } } /* reject queries with a returning list */ diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index df702cd0e..28a77fe77 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -120,40 +120,94 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) } +/* + * IsModifyCommand returns true if the query performs modifications, false + * otherwise. + */ +bool +IsModifyCommand(Query *query) +{ + CmdType commandType = query->commandType; + + if (commandType == CMD_INSERT || commandType == CMD_UPDATE || + commandType == CMD_DELETE || query->hasModifyingCTE) + { + return true; + } + + return false; +} + + /* * CreateDistributedPlan encapsulates the logic needed to transform a particular - * query into a distributed plan. For modifications, queries immediately enter - * the physical planning stage, since they are essentially "routed" to remote - * target shards. SELECT queries go through the full logical plan/optimize/ - * physical plan process needed to produce distributed query plans. + * query into a distributed plan. */ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { - MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query, - restrictionContext); - if (physicalPlan == NULL) + MultiPlan *distributedPlan = NULL; + + if (IsModifyCommand(query)) { - /* Create and optimize logical plan */ - MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query); - MultiLogicalPlanOptimize(logicalPlan); - /* - * This check is here to make it likely that all node types used in - * Citus are dumpable. Explain can dump logical and physical plans - * using the extended outfuncs infrastructure, but it's infeasible to - * test most plans. MultiQueryContainerNode always serializes the - * physical plan, so there's no need to check that separately. + * Modifications are always routed through the same + * planner/executor. As there's currently no other way to plan these, + * error out if the query is unsupported. */ - CheckNodeIsDumpable((Node *) logicalPlan); + distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext); + Assert(distributedPlan); + if (distributedPlan->planningError) + { + RaiseDeferredError(distributedPlan->planningError, ERROR); + } + } + else + { + /* + * For select queries we, if router executor is enabled, first try to + * plan the query as a router query. If not supported, otherwise try + * the full blown plan/optimize/physical planing process needed to + * produce distributed query plans. + */ + if (EnableRouterExecution) + { + distributedPlan = CreateRouterPlan(originalQuery, query, restrictionContext); - /* Create the physical plan */ - physicalPlan = MultiPhysicalPlanCreate(logicalPlan); + /* for debugging it's useful to display why query was not router plannable */ + if (distributedPlan && distributedPlan->planningError) + { + RaiseDeferredError(distributedPlan->planningError, DEBUG1); + } + } + + /* router didn't yield a plan, try the full distributed planner */ + if (!distributedPlan || distributedPlan->planningError) + { + /* Create and optimize logical plan */ + MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query); + MultiLogicalPlanOptimize(logicalPlan); + + /* + * This check is here to make it likely that all node types used in + * Citus are dumpable. Explain can dump logical and physical plans + * using the extended outfuncs infrastructure, but it's infeasible to + * test most plans. MultiQueryContainerNode always serializes the + * physical plan, so there's no need to check that separately. + */ + CheckNodeIsDumpable((Node *) logicalPlan); + + /* Create the physical plan */ + distributedPlan = MultiPhysicalPlanCreate(logicalPlan); + + /* distributed plan currently should always succeed or error out */ + Assert(distributedPlan && distributedPlan->planningError == NULL); + } } /* store required data into the planned statement */ - return MultiQueryContainerNode(localPlan, physicalPlan); + return MultiQueryContainerNode(localPlan, distributedPlan); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 78094bd28..68254c739 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -25,6 +25,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/deparse_shard_query.h" #include "distributed/distribution_column.h" +#include "distributed/errormessage.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -74,7 +75,8 @@ typedef struct WalkerState bool EnableRouterExecution = true; /* planner functions forward declarations */ -static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, +static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, + Query *query, RelationRestrictionContext * restrictionContext); static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery, @@ -114,58 +116,64 @@ static bool MultiRouterPlannableQuery(Query *query, static RelationRestrictionContext * CopyRelationRestrictionContext( RelationRestrictionContext *oldContext); static Node * InstantiatePartitionQual(Node *node, void *context); -static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree, - RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, - bool allReferenceTables); -static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query); -static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, - RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, - Oid * - selectPartitionColumnTableId); +static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree, + RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + bool allReferenceTables); +static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query); +static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, + RangeTblEntry *insertRte, + RangeTblEntry * + subqueryRte, + Oid * + selectPartitionColumnTableId); static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar); -static void ErrorIfQueryHasModifyingCTE(Query *queryTree); +static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); + /* - * MultiRouterPlanCreate creates a multi plan for the queries - * that includes the following: - * (i) modification queries that hit a single shard - * (ii) select queries hat can be executed on a single worker - * node and does not require any operations on the master node. - * (iii) INSERT INTO .... SELECT queries - * - * The function returns NULL if it cannot create the plan for SELECT - * queries and errors out if it cannot plan the modify queries. + * CreateRouterPlan attempts to create a router executor plan for the given + * SELECT statement. If planning fails either NULL is returned, or + * ->planningError is set to a description of the failure. */ MultiPlan * -MultiRouterPlanCreate(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext) +CreateRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) { - MultiPlan *multiPlan = NULL; + Assert(EnableRouterExecution); - bool routerPlannable = MultiRouterPlannableQuery(query, restrictionContext); - if (!routerPlannable) + if (MultiRouterPlannableQuery(query, restrictionContext)) { - return NULL; + return CreateSingleTaskRouterPlan(originalQuery, query, + restrictionContext); } + /* + * TODO: Instead have MultiRouterPlannableQuery set an error describing + * why router cannot support the query. + */ + return NULL; +} + + +/* + * CreateModifyPlan attempts to create a plan the given modification + * statement. If planning fails ->planningError is set to a description of + * the failure. + */ +MultiPlan * +CreateModifyPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) +{ if (InsertSelectQuery(originalQuery)) { - multiPlan = CreateInsertSelectRouterPlan(originalQuery, restrictionContext); + return CreateInsertSelectRouterPlan(originalQuery, restrictionContext); } else { - multiPlan = CreateSingleTaskRouterPlan(originalQuery, query, restrictionContext); + return CreateSingleTaskRouterPlan(originalQuery, query, + restrictionContext); } - - /* plans created by router planner are always router executable */ - if (multiPlan != NULL) - { - multiPlan->routerExecutable = true; - } - - return multiPlan; } @@ -173,8 +181,8 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query, * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * either a modify task that changes a single shard, or a router task that returns * query results from a single worker. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable then the function - * returns NULL. + * are router plannable by default. If query is not router plannable then either NULL is + * returned, or the returned plan has planningError set to a description of the problem. */ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, @@ -185,7 +193,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, Job *job = NULL; Task *task = NULL; List *placementList = NIL; - MultiPlan *multiPlan = NULL; + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) @@ -195,12 +203,23 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, if (modifyTask) { - ErrorIfModifyQueryNotSupported(query); + /* FIXME: this should probably rather be inlined into CreateModifyPlan */ + multiPlan->planningError = ModifyQuerySupported(query); + if (multiPlan->planningError) + { + return multiPlan; + } task = RouterModifyTask(originalQuery, query); + Assert(task); } else { - ErrorIfQueryHasModifyingCTE(query); + /* FIXME: this should probably rather be inlined into CreateSelectPlan */ + multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query); + if (multiPlan->planningError) + { + return multiPlan; + } task = RouterSelectTask(originalQuery, restrictionContext, &placementList); } @@ -213,10 +232,10 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, job = RouterQueryJob(originalQuery, task, placementList); - multiPlan = CitusMakeNode(MultiPlan); multiPlan->workerJob = job; multiPlan->masterQuery = NULL; multiPlan->masterTableName = NULL; + multiPlan->routerExecutable = true; return multiPlan; } @@ -237,7 +256,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ Job *workerJob = NULL; uint64 jobId = INVALID_JOB_ID; - MultiPlan *multiPlan = NULL; + MultiPlan *multiPlan = CitusMakeNode(MultiPlan); RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); Oid targetRelationId = insertRte->relid; @@ -249,8 +268,13 @@ CreateInsertSelectRouterPlan(Query *originalQuery, * Error semantics for INSERT ... SELECT queries are different than regular * modify queries. Thus, handle separately. */ - ErrorIfInsertSelectQueryNotSupported(originalQuery, insertRte, subqueryRte, - allReferenceTables); + multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte, + subqueryRte, + allReferenceTables); + if (multiPlan->planningError) + { + return multiPlan; + } /* * Plan select query for each shard in the target table. Do so by replacing the @@ -291,10 +315,10 @@ CreateInsertSelectRouterPlan(Query *originalQuery, workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* and finally the multi plan */ - multiPlan = CitusMakeNode(MultiPlan); multiPlan->workerJob = workerJob; multiPlan->masterTableName = NULL; multiPlan->masterQuery = NULL; + multiPlan->routerExecutable = true; return multiPlan; } @@ -628,18 +652,19 @@ ExtractInsertRangeTableEntry(Query *query) /* - * ErrorIfInsertSelectQueryNotSupported errors out for unsupported - * INSERT ... SELECT queries. + * InsertSelectQueryNotSupported returns NULL if the INSERT ... SELECT query + * is supported, or a description why not. */ -static void -ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, bool allReferenceTables) +static DeferredErrorMessage * +InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, bool allReferenceTables) { Query *subquery = NULL; Oid selectPartitionColumnTableId = InvalidOid; Oid targetRelationId = insertRte->relid; char targetPartitionMethod = PartitionMethod(targetRelationId); ListCell *rangeTableCell = NULL; + DeferredErrorMessage *error = NULL; /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); @@ -653,8 +678,9 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->relkind == RELKIND_VIEW) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot insert into view over distributed table"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot insert into view over distributed table", + NULL, NULL); } } @@ -662,15 +688,18 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, if (contain_volatile_functions((Node *) queryTree)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Volatile functions are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "volatile functions are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* we don't support LIMIT, OFFSET and WINDOW functions */ - ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery); + error = MultiTaskRouterSelectQuerySupported(subquery); + if (error) + { + return error; + } /* * If we're inserting into a reference table, all participating tables @@ -680,18 +709,23 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, { if (!allReferenceTables) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("If data inserted into a reference table, " - "all of the participating tables in the " - "INSERT INTO ... SELECT query should be " - "reference tables."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "only reference tables may be queried when targeting " + "a reference table with INSERT ... SELECT", + NULL, NULL); } } else { + DeferredErrorMessage *error = NULL; + /* ensure that INSERT's partition column comes from SELECT's partition column */ - ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree, insertRte, subqueryRte, - &selectPartitionColumnTableId); + error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte, + &selectPartitionColumnTableId); + if (error) + { + return error; + } /* * We expect partition column values come from colocated tables. Note that we @@ -700,22 +734,23 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, */ if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("INSERT target table and the source relation " - "of the SELECT partition column value " - "must be colocated"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT target table and the source relation of the SELECT partition " + "column value must be colocated", + NULL, NULL); } } + + return NULL; } /* - * ErrorUnsupportedMultiTaskSelectQuery errors out on queries that we support - * for single task router queries, but, cannot allow for multi task router - * queries. We do these checks recursively to prevent any wrong results. + * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used + * as the source for an INSERT ... SELECT or returns a description why not. */ -static void -ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) +static DeferredErrorMessage * +MultiTaskRouterSelectQuerySupported(Query *query) { List *queryList = NIL; ListCell *queryCell = NULL; @@ -730,21 +765,19 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) /* pushing down limit per shard would yield wrong results */ if (subquery->limitCount != NULL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("LIMIT clauses are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "LIMIT clauses are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* pushing down limit offest per shard would yield wrong results */ if (subquery->limitOffset != NULL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("OFFSET clauses are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "OFFSET clauses are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* @@ -754,21 +787,19 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) */ if (subquery->windowClause != NULL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Window functions are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "window functions are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* see comment on AddUninstantiatedPartitionRestriction() */ if (subquery->setOperations != NULL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Set operations are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "set operations are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* @@ -779,11 +810,10 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) */ if (subquery->groupingSets != NULL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Grouping sets are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "grouping sets are not allowed in INSERT ... SELECT " + "queries", + NULL, NULL); } /* @@ -792,27 +822,29 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) */ if (subquery->hasDistinctOn) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("DISTINCT ON clauses are not allowed in " - "INSERT ... SELECT queries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "DISTINCT ON clauses are not allowed in " + "INSERT ... SELECT queries", + NULL, NULL); } } + + return NULL; } /* - * ErrorIfInsertPartitionColumnDoesNotMatchSelect checks whether the INSERTed table's - * partition column value matches with the any of the SELECTed table's partition column. + * InsertPartitionColumnMatchesSelect returns NULL the partition column in the + * table targeted by INSERTed matches with the any of the SELECTed table's + * partition column. Returns the error description if there's no match. * - * On return without error (i.e., if partition columns match), the function also sets - * selectPartitionColumnTableId. + * On return without error (i.e., if partition columns match), the function + * also sets selectPartitionColumnTableId. */ -static void -ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *insertRte, - RangeTblEntry *subqueryRte, - Oid *selectPartitionColumnTableId) +static DeferredErrorMessage * +InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, + RangeTblEntry *subqueryRte, + Oid *selectPartitionColumnTableId) { ListCell *targetEntryCell = NULL; uint32 rangeTableId = 1; @@ -972,14 +1004,13 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse } } - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot plan INSERT INTO ... SELECT " - "because partition columns in the target table " - "and the subquery do not match"), - errdetail(errorDetailTemplate, exprDescription), - errhint("Ensure the target table's partition column has a " - "corresponding simple column reference to a distributed " - "table's partition column in the subquery."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT INTO ... SELECT partition columns in the source " + "table and subquery do not match", + psprintf(errorDetailTemplate, exprDescription), + "Ensure the target table's partition column has a " + "corresponding simple column reference to a distributed " + "table's partition column in the subquery."); } /* @@ -988,27 +1019,24 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse */ if (!IsA(targetEntry->expr, Var)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot plan INSERT INTO ... SELECT " - "because partition columns in the target table " - "and the subquery do not match"), - errdetail( - "The data type of the target table's partition column " - "should exactly match the data type of the " - "corresponding simple column reference in the subquery."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT INTO ... SELECT partition columns in the source " + "table and subquery do not match", + "The data type of the target table's partition column " + "should exactly match the data type of the " + "corresponding simple column reference in the subquery.", + NULL); } /* finally, check that the select target column is a partition column */ if (!IsPartitionColumn(selectTargetExpr, subquery)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot plan INSERT INTO ... SELECT " - "because partition columns in the target table " - "and the subquery do not match"), - errdetail( - "The target table's partition column " - "should correspond to a partition column " - "in the subquery."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT INTO ... SELECT partition columns in the source " + "table and subquery do not match", + "The target table's partition column should correspond " + "to a partition column in the subquery.", + NULL); } /* we can set the select relation id */ @@ -1019,11 +1047,15 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse if (!targetTableHasPartitionColumn) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot plan INSERT INTO ... SELECT " - "because the query doesn't include the target table's " - "partition column"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "INSERT INTO ... SELECT partition columns in the source " + "table and subquery do not match", + "the query doesn't include the target table's " + "partition column", + NULL); } + + return NULL; } @@ -1155,11 +1187,11 @@ AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn) /* - * ErrorIfModifyQueryNotSupported checks if the query contains unsupported features, - * and errors out if it does. + * ModifyQuerySupported returns NULL if the query only contains supported + * features, otherwise it returns an error description. */ -void -ErrorIfModifyQueryNotSupported(Query *queryTree) +DeferredErrorMessage * +ModifyQuerySupported(Query *queryTree) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; @@ -1185,21 +1217,18 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) */ if (queryTree->hasSubLinks == true) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Subqueries are not supported in distributed" - " modifications."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "subqueries are not supported in distributed modifications", + NULL, NULL); } /* reject queries which include CommonTableExpr */ if (queryTree->cteList != NIL) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Common table expressions are not supported in" - " distributed modifications."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "common table expressions are not supported in distributed " + "modifications", + NULL, NULL); } /* extract range table entries */ @@ -1227,11 +1256,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (referenceTable && !schemaNode) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Modifications to reference tables are " - "supported only from the schema node."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given" + " modification", + "Modifications to reference tables are " + "supported only from the schema node.", + NULL); } queryTableCount++; @@ -1239,8 +1269,9 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) /* we do not expect to see a view in modify query */ if (rangeTableEntry->relkind == RELKIND_VIEW) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot modify views over distributed tables"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot modify views over distributed tables", + NULL, NULL); } } else if (rangeTableEntry->rtekind == RTE_VALUES) @@ -1277,10 +1308,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) rangeTableEntryErrorDetail = "Unrecognized range table entry."; } - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modifications"), - errdetail("%s", rangeTableEntryErrorDetail))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given " + "modifications", + rangeTableEntryErrorDetail, + NULL); } } @@ -1291,11 +1323,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) */ if (commandType != CMD_INSERT && queryTableCount != 1) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Joins are not supported in distributed " - "modifications."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given" + " modification", + "Joins are not supported in distributed " + "modifications.", + NULL); } /* reject queries which involve multi-row inserts */ @@ -1308,11 +1341,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) * with a constant, and if you're inserting multiple rows at once the function * should return a different value for each row. */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Multi-row INSERTs to distributed tables are not " - "supported."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning for the given" + " modification", + "Multi-row INSERTs to distributed tables are not " + "supported.", + NULL); } if (commandType == CMD_INSERT || commandType == CMD_UPDATE || @@ -1347,9 +1381,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (commandType == CMD_UPDATE && contain_volatile_functions((Node *) targetEntry->expr)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in UPDATE queries on distributed " - "tables must not be VOLATILE"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in UPDATE queries on distributed " + "tables must not be VOLATILE", + NULL, NULL); } if (commandType == CMD_UPDATE && targetEntryPartitionColumn && @@ -1362,9 +1397,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (commandType == CMD_INSERT && targetEntryPartitionColumn && !IsA(targetEntry->expr, Const)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("values given for the partition column must be" - " constants or constant expressions"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "values given for the partition column must be" + " constants or constant expressions", + NULL, NULL); } if (commandType == CMD_UPDATE && @@ -1379,10 +1415,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { if (contain_volatile_functions(joinTree->quals)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in the WHERE clause of " - "modification queries on distributed tables " - "must not be VOLATILE"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in the WHERE clause of modification " + "queries on distributed tables must not be VOLATILE", + NULL, NULL); } else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, &hasBadCoalesce)) @@ -1393,23 +1429,26 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (hasVarArgument) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("STABLE functions used in UPDATE queries" - " cannot be called with column references"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "STABLE functions used in UPDATE queries " + "cannot be called with column references", + NULL, NULL); } if (hasBadCoalesce) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("non-IMMUTABLE functions are not allowed in CASE or" - " COALESCE statements"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in CASE or " + "COALESCE statements", + NULL, NULL); } if (contain_mutable_functions((Node *) queryTree->returningList)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("non-IMMUTABLE functions are not allowed in the" - " RETURNING clause"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "non-IMMUTABLE functions are not allowed in the " + "RETURNING clause", + NULL, NULL); } } @@ -1470,10 +1509,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } else if (contain_mutable_functions((Node *) setTargetEntry->expr)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in the DO UPDATE SET clause of " - "INSERTs on distributed tables must be marked " - "IMMUTABLE"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in the DO UPDATE SET clause of " + "INSERTs on distributed tables must be marked " + "IMMUTABLE", + NULL, NULL); } } } @@ -1482,17 +1522,22 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (contain_mutable_functions((Node *) arbiterWhere) || contain_mutable_functions((Node *) onConflictWhere)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in the WHERE clause of the ON CONFLICT " - "clause of INSERTs on distributed tables must be marked " - "IMMUTABLE"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "functions used in the WHERE clause of the " + "ON CONFLICT clause of INSERTs on distributed " + "tables must be marked IMMUTABLE", + NULL, NULL); } if (specifiesPartitionValue) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("modifying the partition value of rows is not allowed"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifying the partition value of rows is not " + "allowed", + NULL, NULL); } + + return NULL; } @@ -2579,7 +2624,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) * the same node. Router plannable checks for select queries can be turned off * by setting citus.enable_router_execution flag to false. */ -bool +static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext) { CmdType commandType = query->commandType; @@ -3021,19 +3066,13 @@ InstantiatePartitionQual(Node *node, void *context) * ErrorIfQueryHasModifyingCTE checks if the query contains modifying common table * expressions and errors out if it does. */ -static void +static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree) { ListCell *cteCell = NULL; Assert(queryTree->commandType == CMD_SELECT); - /* we do not need to do anything if there are no CTEs */ - if (queryTree->cteList == NIL) - { - return; - } - foreach(cteCell, queryTree->cteList) { CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); @@ -3047,11 +3086,13 @@ ErrorIfQueryHasModifyingCTE(Query *queryTree) */ if (cteQuery->commandType != CMD_SELECT) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "modification"), - errdetail("Data-modifying statements are not supported in " - "the WITH clauses of distributed queries."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "data-modifying statements are not supported in " + "the WITH clauses of distributed queries", + NULL, NULL); } } + + /* everything OK */ + return NULL; } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 28b943848..0e9ed1aa6 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -280,6 +280,7 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(masterQuery); WRITE_STRING_FIELD(masterTableName); WRITE_BOOL_FIELD(routerExecutable); + WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 149ae0492..4d51ae86d 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -187,6 +187,7 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(masterQuery); READ_STRING_FIELD(masterTableName); READ_BOOL_FIELD(routerExecutable); + READ_NODE_FIELD(planningError); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 0905a3b15..61f5baf73 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "distributed/citus_nodes.h" +#include "distributed/errormessage.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_logical_planner.h" #include "lib/stringinfo.h" @@ -215,6 +216,13 @@ typedef struct MultiPlan Query *masterQuery; char *masterTableName; bool routerExecutable; + + /* + * NULL if this a valid plan, an error description otherwise. This will + * e.g. be set if SQL features are present that a planner doesn't support, + * or if prepared statement parameters prevented successful planning. + */ + DeferredErrorMessage *planningError; } MultiPlan; diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index a425232fa..baee8a064 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -56,5 +56,6 @@ struct MultiPlan; extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); +extern bool IsModifyCommand(Query *query); #endif /* MULTI_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 9fac89427..86d69a258 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -14,6 +14,7 @@ #include "c.h" +#include "distributed/errormessage.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" @@ -28,10 +29,13 @@ extern bool EnableRouterExecution; -extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext); +extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext); +extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext); + extern void AddUninstantiatedPartitionRestriction(Query *originalQuery); -extern void ErrorIfModifyQueryNotSupported(Query *queryTree); +extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree); extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte, RangeTblEntry *subqueryRte); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 491406d55..f28f6f472 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -177,16 +177,14 @@ SELECT user_id, (random()*10)::int FROM raw_events_first; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +ERROR: volatile functions are not allowed in INSERT ... SELECT queries INSERT INTO raw_events_second (user_id, value_1) WITH sub_cte AS (SELECT (random()*10)::int) SELECT user_id, (SELECT * FROM sub_cte) FROM raw_events_first; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +ERROR: volatile functions are not allowed in INSERT ... SELECT queries -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -1098,8 +1096,7 @@ INSERT INTO agg_events (value_1_agg, user_id) DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot perform distributed planning for the given modification -DETAIL: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries +ERROR: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries -- We do not support some CTEs WITH fist_table_agg AS (SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id) @@ -1112,7 +1109,7 @@ INSERT INTO agg_events DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -- We do support some CTEs INSERT INTO agg_events @@ -1156,8 +1153,7 @@ FROM DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot perform distributed planning for the given modification -DETAIL: Set operations are not allowed in INSERT ... SELECT queries +ERROR: set operations are not allowed in INSERT ... SELECT queries -- We do not support any set operations INSERT INTO raw_events_first(user_id) @@ -1166,8 +1162,7 @@ INSERT INTO DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot perform distributed planning for the given modification -DETAIL: Set operations are not allowed in INSERT ... SELECT queries +ERROR: set operations are not allowed in INSERT ... SELECT queries -- We do not support any set operations INSERT INTO raw_events_first(user_id) @@ -1179,8 +1174,7 @@ FROM DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot perform distributed planning for the given modification -DETAIL: Set operations are not allowed in INSERT ... SELECT queries +ERROR: set operations are not allowed in INSERT ... SELECT queries -- unsupported JOIN INSERT INTO agg_events (value_4_agg, @@ -1219,7 +1213,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. -- error cases -- no part column at all @@ -1230,7 +1224,8 @@ FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because the query doesn't include the target table's partition column +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DETAIL: the query doesn't include the target table's partition column INSERT INTO raw_events_second (value_1) SELECT user_id @@ -1238,7 +1233,8 @@ FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because the query doesn't include the target table's partition column +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match +DETAIL: the query doesn't include the target table's partition column INSERT INTO raw_events_second (user_id) SELECT value_1 @@ -1246,7 +1242,7 @@ FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. INSERT INTO raw_events_second (user_id) @@ -1255,7 +1251,7 @@ FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an operator in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO raw_events_second @@ -1265,7 +1261,7 @@ FROM raw_events_first; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO agg_events @@ -1284,7 +1280,7 @@ GROUP BY user_id; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an aggregation in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO agg_events @@ -1304,7 +1300,7 @@ GROUP BY user_id, DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -- tables should be co-located INSERT INTO agg_events (user_id) @@ -1315,7 +1311,7 @@ FROM DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -- unsupported joins between subqueries -- we do not return bare partition column on the inner query @@ -1344,7 +1340,7 @@ ON (f.id = f2.id); DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -- the second part of the query is not routable since @@ -1428,8 +1424,7 @@ GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: -ERROR: cannot perform distributed planning for the given modification -DETAIL: Grouping sets are not allowed in INSERT ... SELECT queries +ERROR: grouping sets are not allowed in INSERT ... SELECT queries -- set back to INFO SET client_min_messages TO INFO; DEBUG: StartTransactionCommand @@ -1999,8 +1994,7 @@ FROM table_with_defaults GROUP BY store_id; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +ERROR: volatile functions are not allowed in INSERT ... SELECT queries -- do some more error/error message checks SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; @@ -2030,42 +2024,42 @@ INSERT INTO text_table (part_col) CASE WHEN part_col = 'onder' THEN 'marco' END FROM text_table ; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains a case expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT part_col::text from char_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. INSERT INTO text_table (part_col) SELECT val FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. INSERT INTO text_table (part_col) SELECT val::text FROM text_table; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 411aedef9..0f45ecf3b 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -216,8 +216,7 @@ DETAIL: Multi-row INSERTs to distributed tables are not supported. -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Common table expressions are not supported in distributed modifications. +ERROR: common table expressions are not supported in distributed modifications -- test simple DELETE INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -275,8 +274,7 @@ ERROR: cannot plan queries that include both regular and partitioned relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Common table expressions are not supported in distributed modifications. +ERROR: common table expressions are not supported in distributed modifications -- cursors are not supported DELETE FROM limit_orders WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard @@ -421,8 +419,7 @@ ERROR: cannot plan queries that include both regular and partitioned relations -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Common table expressions are not supported in distributed modifications. +ERROR: common table expressions are not supported in distributed modifications SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 3ee311bc7..7ff726ce1 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1091,7 +1091,7 @@ FROM colocated_table_test, colocated_table_test_2 WHERE colocated_table_test.value_1 = colocated_table_test.value_1; -ERROR: If data inserted into a reference table, all of the participating tables in the INSERT INTO ... SELECT query should be reference tables. +ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT -- should error out, same as the above INSERT INTO reference_table_test (value_1) @@ -1101,7 +1101,7 @@ FROM colocated_table_test, reference_table_test WHERE colocated_table_test.value_1 = reference_table_test.value_1; -ERROR: If data inserted into a reference table, all of the participating tables in the INSERT INTO ... SELECT query should be reference tables. +ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT -- now, insert into the hash partitioned table and use reference -- tables in the SELECT queries INSERT INTO @@ -1146,7 +1146,7 @@ FROM WHERE colocated_table_test_2.value_4 = reference_table_test.value_4 RETURNING value_1, value_2; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. -- partition column value comes from reference table which should error out INSERT INTO @@ -1158,7 +1158,7 @@ FROM WHERE colocated_table_test_2.value_4 = reference_table_test.value_4 RETURNING value_1, value_2; -ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match +ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -- some tests for mark_tables_colocated -- should error out diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 46f9e34a3..339bcb764 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -444,8 +444,7 @@ WITH new_article AS ( INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * ) SELECT * FROM new_article; -ERROR: cannot perform distributed planning for the given modification -DETAIL: Data-modifying statements are not supported in the WITH clauses of distributed queries. +ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries -- Modifying statement in nested CTE case is covered by PostgreSQL itself WITH new_article AS ( WITH nested_cte AS ( diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 98077f51a..c9b0f3cd2 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -92,8 +92,7 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Common table expressions are not supported in distributed modifications. +ERROR: common table expressions are not supported in distributed modifications -- Check that we can successfully delete from multiple shards with 1PC SET citus.multi_shard_commit_protocol TO '1pc'; SELECT count(*) FROM multi_shard_modify_test; @@ -234,8 +233,7 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name= ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Common table expressions are not supported in distributed modifications. +ERROR: common table expressions are not supported in distributed modifications -- updates referencing just a var are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=t_key WHERE t_key = 10'); master_modify_multiple_shards diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 10106d088..8038b7087 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -237,8 +237,7 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL -- subquery in the SET clause INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = (SELECT count(*) from upsert_test); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Subqueries are not supported in distributed modifications. +ERROR: subqueries are not supported in distributed modifications -- non mutable function call in the SET INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = random()::int; From 6939cb8c561f896ece0a28b48cae7d8cb9f26ed3 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 16 Jan 2017 18:19:51 -0800 Subject: [PATCH 5/5] Hack up PREPARE/EXECUTE for nearly all distributed queries. All router, real-time, task-tracker plannable queries should now have full prepared statement support (and even use router when possible), unless they don't go through the custom plan interface (which basically just affects LANGUAGE SQL (not plpgsql) functions). This is achieved by forcing postgres' planner to always choose a custom plan, by assigning very low costs to plans with bound parameters (i.e. ones were the postgres planner replanned the query upon EXECUTE with all parameter values provided), instead of the generic one. This requires some trickery, because for custom plans to work the costs for a non-custom plan have to be known, which means we can't error out when planning the generic plan. Instead we have to return a "faux" plan, that'd trigger an error message if executed. But due to the custom plan logic that plan will likely (unless called by an SQL function, or because we can't support that query for some reason) not be executed; instead the custom plan will be chosen. --- .../distributed/executor/multi_executor.c | 3 + .../distributed/planner/multi_explain.c | 3 + .../distributed/planner/multi_planner.c | 175 ++++++++++++++++-- src/include/distributed/multi_planner.h | 1 + src/test/regress/expected/multi_explain.out | 7 + src/test/regress/expected/multi_explain_0.out | 7 + .../regress/expected/multi_prepare_plsql.out | 110 +++++++---- .../regress/expected/multi_prepare_sql.out | 87 ++++++--- .../regress/expected/multi_sql_function.out | 4 +- src/test/regress/sql/multi_explain.sql | 5 + src/test/regress/sql/multi_prepare_plsql.sql | 43 ++--- src/test/regress/sql/multi_prepare_sql.sql | 38 +--- 12 files changed, 354 insertions(+), 129 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 9d2a8ea61..061f00619 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -52,6 +52,9 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; Job *workerJob = multiPlan->workerJob; + /* ensure plan is executable */ + VerifyMultiPlanValidity(multiPlan); + ExecCheckRTPerms(planStatement->rtable, true); executorType = JobExecutorType(multiPlan); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index fd4f59cef..46d410c90 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -167,6 +167,9 @@ MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into, multiPlan = GetMultiPlan(plan); + /* ensure plan is executable */ + VerifyMultiPlanValidity(multiPlan); + if (!ExplainDistributedQueries) { appendStringInfo(es->str, "explain statements for distributed queries "); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 28a77fe77..ca8ea5769 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -25,6 +25,7 @@ #include "executor/executor.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" #include "optimizer/planner.h" @@ -42,11 +43,13 @@ static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, + ParamListInfo boundParams, RelationRestrictionContext * restrictionContext); static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void); static void PopRestrictionContext(void); +static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); /* Distributed planner hook */ @@ -103,7 +106,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (needsDistributedPlanning) { result = CreateDistributedPlan(result, originalQuery, parse, - restrictionContext); + boundParams, restrictionContext); } } PG_CATCH(); @@ -139,29 +142,49 @@ IsModifyCommand(Query *query) } +/* + * VerifyMultiPlanValidity verifies that multiPlan is ready for execution, or + * errors out if not. + * + * A plan may e.g. not be ready for execution because CreateDistributedPlan() + * couldn't find a plan due to unresolved prepared statement parameters, but + * didn't error out, because we expect custom plans to come to our rescue. + * But sql (not plpgsql) functions unfortunately don't go through a codepath + * supporting custom plans. + */ +void +VerifyMultiPlanValidity(MultiPlan *multiPlan) +{ + if (multiPlan->planningError) + { + RaiseDeferredError(multiPlan->planningError, ERROR); + } +} + + /* * CreateDistributedPlan encapsulates the logic needed to transform a particular * query into a distributed plan. */ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query, + ParamListInfo boundParams, RelationRestrictionContext *restrictionContext) { MultiPlan *distributedPlan = NULL; + PlannedStmt *resultPlan = NULL; + bool hasUnresolvedParams = false; + + if (HasUnresolvedExternParamsWalker((Node *) query, boundParams)) + { + hasUnresolvedParams = true; + } if (IsModifyCommand(query)) { - /* - * Modifications are always routed through the same - * planner/executor. As there's currently no other way to plan these, - * error out if the query is unsupported. - */ + /* modifications are always routed through the same planner/executor */ distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext); Assert(distributedPlan); - if (distributedPlan->planningError) - { - RaiseDeferredError(distributedPlan->planningError, ERROR); - } } else { @@ -182,8 +205,13 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } } - /* router didn't yield a plan, try the full distributed planner */ - if (!distributedPlan || distributedPlan->planningError) + /* + * Router didn't yield a plan, try the full distributed planner. As + * real-time/task-tracker don't support prepared statement parameters, + * skip planning in that case (we'll later trigger an error in that + * case if necessary). + */ + if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams) { /* Create and optimize logical plan */ MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query); @@ -206,8 +234,62 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } } + /* + * If no plan was generated, prepare a generic error to be emitted. + * Normally this error message will never returned to the user, as it's + * usually due to unresolved prepared statement parameters - in that case + * the logic below will force a custom plan (i.e. with parameters bound to + * specific values) to be generated. But sql (not plpgsql) functions + * unfortunately don't go through a codepath supporting custom plans - so + * we still need to have an error prepared. + */ + if (!distributedPlan) + { + /* currently always should have a more specific error otherwise */ + Assert(hasUnresolvedParams); + distributedPlan = CitusMakeNode(MultiPlan); + distributedPlan->planningError = + DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "could not create distributed plan", + "Possibly this is caused by the use of parameters in SQL " + "functions, which is not supported in Citus.", + "Consider using PLPGSQL functions instead."); + } + + /* + * Error out if none of the planners resulted in a usable plan, unless the + * error was possibly triggered by missing parameters. In that case we'll + * not error out here, but instead rely on postgres' custom plan logic. + * Postgres re-plans prepared statements the first five executions + * (i.e. it produces custom plans), after that the cost of a generic plan + * is compared with the average custom plan cost. We support otherwise + * unsupported prepared statement parameters by assigning an exorbitant + * cost to the unsupported query. That'll lead to the custom plan being + * chosen. But for that to be possible we can't error out here, as + * otherwise that logic is never reached. + */ + if (distributedPlan->planningError && !hasUnresolvedParams) + { + RaiseDeferredError(distributedPlan->planningError, ERROR); + } + /* store required data into the planned statement */ - return MultiQueryContainerNode(localPlan, distributedPlan); + resultPlan = MultiQueryContainerNode(localPlan, distributedPlan); + + /* + * As explained above, force planning costs to be unrealistically high if + * query planning failed (possibly) due to prepared statement parameters. + */ + if (distributedPlan->planningError && hasUnresolvedParams) + { + /* + * Arbitraryly high cost, but low enough that it can be added up + * without overflowing by choose_custom_plan(). + */ + resultPlan->planTree->total_cost = FLT_MAX / 100000000; + } + + return resultPlan; } @@ -502,3 +584,70 @@ PopRestrictionContext(void) { relationRestrictionContextList = list_delete_first(relationRestrictionContextList); } + + +/* + * HasUnresolvedExternParamsWalker returns true if the passed in expression + * has external parameters that are not contained in boundParams, false + * otherwise. + */ +static bool +HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams) +{ + if (expression == NULL) + { + return false; + } + + if (IsA(expression, Param)) + { + Param *param = (Param *) expression; + int paramId = param->paramid; + + /* only care about user supplied parameters */ + if (param->paramkind != PARAM_EXTERN) + { + return false; + } + + /* don't care about our special parameter, it'll be removed during planning */ + if (paramId == UNINSTANTIATED_PARAMETER_ID) + { + return false; + } + + /* check whether parameter is available (and valid) */ + if (boundParams && paramId > 0 && paramId <= boundParams->numParams) + { + ParamExternData *externParam = &boundParams->params[paramId - 1]; + + /* give hook a chance in case parameter is dynamic */ + if (!OidIsValid(externParam->ptype) && boundParams->paramFetch != NULL) + { + (*boundParams->paramFetch)(boundParams, paramId); + } + + if (OidIsValid(externParam->ptype)) + { + return false; + } + } + + return true; + } + + /* keep traversing */ + if (IsA(expression, Query)) + { + return query_tree_walker((Query *) expression, + HasUnresolvedExternParamsWalker, + boundParams, + 0); + } + else + { + return expression_tree_walker(expression, + HasUnresolvedExternParamsWalker, + boundParams); + } +} diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index baee8a064..1c1d7a337 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -57,5 +57,6 @@ extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); extern bool IsModifyCommand(Query *query); +extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); #endif /* MULTI_PLANNER_H */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index a71965210..bafe813bb 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -727,3 +727,10 @@ Distributed Query into pg_merge_job_570039 Master Query -> Aggregate -> Seq Scan on pg_merge_job_570039 +-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but +-- at least make sure to fail without crashing +PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; +EXPLAIN EXECUTE router_executor_query_param(5); +ERROR: could not create distributed plan +DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. +HINT: Consider using PLPGSQL functions instead. diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 9e414231e..7425b80e3 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -698,3 +698,10 @@ Distributed Query into pg_merge_job_570039 Master Query -> Aggregate -> Seq Scan on pg_merge_job_570039 +-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but +-- at least make sure to fail without crashing +PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; +EXPLAIN EXECUTE router_executor_query_param(5); +ERROR: could not create distributed plan +DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. +HINT: Consider using PLPGSQL functions instead. diff --git a/src/test/regress/expected/multi_prepare_plsql.out b/src/test/regress/expected/multi_prepare_plsql.out index 3490beed7..837ab4c2d 100644 --- a/src/test/regress/expected/multi_prepare_plsql.out +++ b/src/test/regress/expected/multi_prepare_plsql.out @@ -265,9 +265,18 @@ SELECT plpgsql_test_2(); (1 row) -- run PL/pgsql functions with different parameters --- FIXME: temporarily disabled, waiting for proper parametrized query support --- SELECT plpgsql_test_6(155); --- SELECT plpgsql_test_6(1555); +SELECT plpgsql_test_6(155); + plpgsql_test_6 +---------------- + 11811 +(1 row) + +SELECT plpgsql_test_6(1555); + plpgsql_test_6 +---------------- + 10183 +(1 row) + -- test router executor parameterized PL/pgsql functions CREATE TABLE plpgsql_table ( key int, @@ -365,9 +374,11 @@ SELECT single_parameter_insert(5); (1 row) SELECT single_parameter_insert(6); -ERROR: values given for the partition column must be constants or constant expressions -CONTEXT: SQL statement "INSERT INTO plpgsql_table (key) VALUES (key_arg)" -PL/pgSQL function single_parameter_insert(integer) line 3 at SQL statement + single_parameter_insert +------------------------- + +(1 row) + CREATE FUNCTION double_parameter_insert(key_arg int, value_arg int) RETURNS void as $$ BEGIN @@ -406,9 +417,11 @@ SELECT double_parameter_insert(5, 50); (1 row) SELECT double_parameter_insert(6, 60); -ERROR: values given for the partition column must be constants or constant expressions -CONTEXT: SQL statement "INSERT INTO plpgsql_table (key, value) VALUES (key_arg, value_arg)" -PL/pgSQL function double_parameter_insert(integer,integer) line 3 at SQL statement + double_parameter_insert +------------------------- + +(1 row) + CREATE FUNCTION non_partition_parameter_insert(value_arg int) RETURNS void as $$ BEGIN @@ -478,7 +491,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value; 4 | 5 | 50 5 | -(22 rows) + 6 | 60 + 6 | +(24 rows) -- check router executor select CREATE FUNCTION router_partition_column_select(key_arg int) @@ -498,6 +513,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT router_partition_column_select(1); router_partition_column_select -------------------------------- @@ -533,9 +549,13 @@ SELECT router_partition_column_select(5); (5,) (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT router_partition_column_select(6); +SELECT router_partition_column_select(6); + router_partition_column_select +-------------------------------- + (6,60) + (6,) +(2 rows) + CREATE FUNCTION router_non_partition_column_select(value_arg int) RETURNS TABLE(key int, value int) AS $$ DECLARE @@ -554,6 +574,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT router_non_partition_column_select(10); router_non_partition_column_select ------------------------------------ @@ -608,6 +629,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT real_time_non_partition_column_select(10); real_time_non_partition_column_select --------------------------------------- @@ -643,9 +665,13 @@ SELECT real_time_non_partition_column_select(50); (5,50) (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_non_partition_column_select(60); +SELECT real_time_non_partition_column_select(60); + real_time_non_partition_column_select +--------------------------------------- + (0,60) + (6,60) +(2 rows) + CREATE FUNCTION real_time_partition_column_select(key_arg int) RETURNS TABLE(key int, value int) AS $$ DECLARE @@ -664,6 +690,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT real_time_partition_column_select(1); real_time_partition_column_select ----------------------------------- @@ -708,9 +735,15 @@ SELECT real_time_partition_column_select(5); (5,) (4 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_partition_column_select(6); +SELECT real_time_partition_column_select(6); + real_time_partition_column_select +----------------------------------- + (0,10) + (1,10) + (6,60) + (6,) +(4 rows) + -- check task-tracker executor SET citus.task_executor_type TO 'task-tracker'; CREATE FUNCTION task_tracker_non_partition_column_select(value_arg int) @@ -730,6 +763,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT task_tracker_non_partition_column_select(10); task_tracker_non_partition_column_select ------------------------------------------ @@ -765,9 +799,13 @@ SELECT task_tracker_non_partition_column_select(50); (5,50) (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_non_partition_column_select(60); +SELECT real_time_non_partition_column_select(60); + real_time_non_partition_column_select +--------------------------------------- + (0,60) + (6,60) +(2 rows) + CREATE FUNCTION task_tracker_partition_column_select(key_arg int) RETURNS TABLE(key int, value int) AS $$ DECLARE @@ -786,6 +824,7 @@ BEGIN value; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT task_tracker_partition_column_select(1); task_tracker_partition_column_select -------------------------------------- @@ -830,9 +869,15 @@ SELECT task_tracker_partition_column_select(5); (5,) (4 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT task_tracker_partition_column_select(6); +SELECT task_tracker_partition_column_select(6); + task_tracker_partition_column_select +-------------------------------------- + (0,10) + (1,10) + (6,60) + (6,) +(4 rows) + SET citus.task_executor_type TO 'real-time'; -- check updates CREATE FUNCTION partition_parameter_update(int, int) RETURNS void as $$ @@ -871,8 +916,7 @@ SELECT partition_parameter_update(5, 51); (1 row) --- This fails with an unexpected error message -SELECT partition_parameter_update(5, 52); +SELECT partition_parameter_update(6, 61); ERROR: distributed modifications must target exactly one shard DETAIL: This command modifies all shards. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. @@ -946,7 +990,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value; 4 | 41 5 | 51 5 | 51 -(22 rows) + 6 | 60 + 6 | +(24 rows) -- check deletes CREATE FUNCTION partition_parameter_delete(int, int) RETURNS void as $$ @@ -954,6 +1000,7 @@ BEGIN DELETE FROM plpgsql_table WHERE key = $1 AND value = $2; END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT partition_parameter_delete(1, 11); partition_parameter_delete ---------------------------- @@ -984,8 +1031,7 @@ SELECT partition_parameter_delete(5, 51); (1 row) --- This fails with an unexpected error message -SELECT partition_parameter_delete(0, 10); +SELECT partition_parameter_delete(6, 61); ERROR: distributed modifications must target exactly one shard DETAIL: This command modifies all shards. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. @@ -1043,7 +1089,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value; 0 | 0 | 0 | -(6 rows) + 6 | 60 + 6 | +(8 rows) -- check whether we can handle execute parameters CREATE TABLE execute_parameter_test (key int, val date); diff --git a/src/test/regress/expected/multi_prepare_sql.out b/src/test/regress/expected/multi_prepare_sql.out index 5bac00df5..8f7e7424d 100644 --- a/src/test/regress/expected/multi_prepare_sql.out +++ b/src/test/regress/expected/multi_prepare_sql.out @@ -240,8 +240,12 @@ EXECUTE prepared_test_6(155); 11811 (1 row) --- FIXME: temporarily disabled --- EXECUTE prepared_test_6(1555); +EXECUTE prepared_test_6(1555); + count +------- + 10183 +(1 row) + -- test router executor with parameterized non-partition columns -- create a custom type which also exists on worker nodes CREATE TYPE test_composite_type AS ( @@ -325,17 +329,16 @@ EXECUTE prepared_select(6, 60); 1 (1 row) --- test that we don't crash on failing parameterized insert on the partition column +-- Test that parameterized partition column for an insert is supported PREPARE prepared_partition_column_insert(bigint) AS INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); --- we error out on the 6th execution +-- execute 6 times to trigger prepared statement usage EXECUTE prepared_partition_column_insert(1); EXECUTE prepared_partition_column_insert(2); EXECUTE prepared_partition_column_insert(3); EXECUTE prepared_partition_column_insert(4); EXECUTE prepared_partition_column_insert(5); EXECUTE prepared_partition_column_insert(6); -ERROR: values given for the partition column must be constants or constant expressions DROP TYPE test_composite_type CASCADE; NOTICE: drop cascades to table router_executor_table column stats -- test router executor with prepare statements @@ -373,7 +376,6 @@ EXECUTE prepared_single_parameter_insert(3); EXECUTE prepared_single_parameter_insert(4); EXECUTE prepared_single_parameter_insert(5); EXECUTE prepared_single_parameter_insert(6); -ERROR: values given for the partition column must be constants or constant expressions PREPARE prepared_double_parameter_insert(int, int) AS INSERT INTO prepare_table (key, value) VALUES ($1, $2); -- execute 6 times to trigger prepared statement usage @@ -383,7 +385,6 @@ EXECUTE prepared_double_parameter_insert(3, 30); EXECUTE prepared_double_parameter_insert(4, 40); EXECUTE prepared_double_parameter_insert(5, 50); EXECUTE prepared_double_parameter_insert(6, 60); -ERROR: values given for the partition column must be constants or constant expressions PREPARE prepared_non_partition_parameter_insert(int) AS INSERT INTO prepare_table (key, value) VALUES (0, $1); -- execute 6 times to trigger prepared statement usage @@ -419,7 +420,9 @@ SELECT * FROM prepare_table ORDER BY key, value; 4 | 5 | 50 5 | -(22 rows) + 6 | 60 + 6 | +(24 rows) -- check router executor select PREPARE prepared_router_partition_column_select(int) AS @@ -468,9 +471,13 @@ EXECUTE prepared_router_partition_column_select(5); 5 | (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_router_partition_column_select(6); +EXECUTE prepared_router_partition_column_select(6); + key | value +-----+------- + 6 | 60 + 6 | +(2 rows) + PREPARE prepared_router_non_partition_column_select(int) AS SELECT prepare_table.key, @@ -566,9 +573,13 @@ EXECUTE prepared_real_time_non_partition_column_select(50); 5 | 50 (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_real_time_non_partition_column_select(60); +EXECUTE prepared_real_time_non_partition_column_select(60); + key | value +-----+------- + 0 | 60 + 6 | 60 +(2 rows) + PREPARE prepared_real_time_partition_column_select(int) AS SELECT prepare_table.key, @@ -625,9 +636,15 @@ EXECUTE prepared_real_time_partition_column_select(5); 5 | (4 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_real_time_partition_column_select(6); +EXECUTE prepared_real_time_partition_column_select(6); + key | value +-----+------- + 0 | 10 + 1 | 10 + 6 | 60 + 6 | +(4 rows) + -- check task-tracker executor SET citus.task_executor_type TO 'task-tracker'; PREPARE prepared_task_tracker_non_partition_column_select(int) AS @@ -676,9 +693,13 @@ EXECUTE prepared_task_tracker_non_partition_column_select(50); 5 | 50 (2 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_task_tracker_non_partition_column_select(60); +EXECUTE prepared_task_tracker_non_partition_column_select(60); + key | value +-----+------- + 0 | 60 + 6 | 60 +(2 rows) + PREPARE prepared_task_tracker_partition_column_select(int) AS SELECT prepare_table.key, @@ -735,9 +756,15 @@ EXECUTE prepared_task_tracker_partition_column_select(5); 5 | (4 rows) --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_task_tracker_partition_column_select(6); +EXECUTE prepared_task_tracker_partition_column_select(6); + key | value +-----+------- + 0 | 10 + 1 | 10 + 6 | 60 + 6 | +(4 rows) + SET citus.task_executor_type TO 'real-time'; -- check updates PREPARE prepared_partition_parameter_update(int, int) AS @@ -748,8 +775,7 @@ EXECUTE prepared_partition_parameter_update(2, 21); EXECUTE prepared_partition_parameter_update(3, 31); EXECUTE prepared_partition_parameter_update(4, 41); EXECUTE prepared_partition_parameter_update(5, 51); --- This fails with an unexpected error message -EXECUTE prepared_partition_parameter_update(5, 52); +EXECUTE prepared_partition_parameter_update(6, 61); ERROR: distributed modifications must target exactly one shard DETAIL: This command modifies all shards. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. @@ -788,7 +814,9 @@ SELECT * FROM prepare_table ORDER BY key, value; 4 | 41 5 | 51 5 | 51 -(22 rows) + 6 | 60 + 6 | +(24 rows) -- check deletes PREPARE prepared_partition_parameter_delete(int, int) AS @@ -798,8 +826,7 @@ EXECUTE prepared_partition_parameter_delete(2, 21); EXECUTE prepared_partition_parameter_delete(3, 31); EXECUTE prepared_partition_parameter_delete(4, 41); EXECUTE prepared_partition_parameter_delete(5, 51); --- This fails with an unexpected error message -EXECUTE prepared_partition_parameter_delete(0, 10); +EXECUTE prepared_partition_parameter_delete(6, 61); ERROR: distributed modifications must target exactly one shard DETAIL: This command modifies all shards. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. @@ -822,7 +849,9 @@ SELECT * FROM prepare_table ORDER BY key, value; 0 | 0 | 0 | -(6 rows) + 6 | 60 + 6 | +(8 rows) -- verify placement state updates invalidate shard state -- diff --git a/src/test/regress/expected/multi_sql_function.out b/src/test/regress/expected/multi_sql_function.out index e6882798d..5fea10339 100644 --- a/src/test/regress/expected/multi_sql_function.out +++ b/src/test/regress/expected/multi_sql_function.out @@ -315,7 +315,9 @@ SELECT * FROM prepare_table ORDER BY key, value; 0 | 0 | 0 | -(6 rows) + 6 | 60 + 6 | +(8 rows) DROP TABLE temp_table; -- clean-up functions diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 9c445596c..c7874a38a 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -227,3 +227,8 @@ EXPLAIN EXECUTE router_executor_query; PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; + +-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but +-- at least make sure to fail without crashing +PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; +EXPLAIN EXECUTE router_executor_query_param(5); diff --git a/src/test/regress/sql/multi_prepare_plsql.sql b/src/test/regress/sql/multi_prepare_plsql.sql index cb1bab581..5018c2cb8 100644 --- a/src/test/regress/sql/multi_prepare_plsql.sql +++ b/src/test/regress/sql/multi_prepare_plsql.sql @@ -183,9 +183,8 @@ SELECT plpgsql_test_1(); SELECT plpgsql_test_2(); -- run PL/pgsql functions with different parameters --- FIXME: temporarily disabled, waiting for proper parametrized query support --- SELECT plpgsql_test_6(155); --- SELECT plpgsql_test_6(1555); +SELECT plpgsql_test_6(155); +SELECT plpgsql_test_6(1555); -- test router executor parameterized PL/pgsql functions CREATE TABLE plpgsql_table ( @@ -276,15 +275,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT router_partition_column_select(1); SELECT router_partition_column_select(2); SELECT router_partition_column_select(3); SELECT router_partition_column_select(4); SELECT router_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT router_partition_column_select(6); +SELECT router_partition_column_select(6); CREATE FUNCTION router_non_partition_column_select(value_arg int) RETURNS TABLE(key int, value int) AS $$ @@ -305,6 +302,7 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT router_non_partition_column_select(10); SELECT router_non_partition_column_select(20); SELECT router_non_partition_column_select(30); @@ -331,15 +329,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT real_time_non_partition_column_select(10); SELECT real_time_non_partition_column_select(20); SELECT real_time_non_partition_column_select(30); SELECT real_time_non_partition_column_select(40); SELECT real_time_non_partition_column_select(50); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_non_partition_column_select(60); +SELECT real_time_non_partition_column_select(60); CREATE FUNCTION real_time_partition_column_select(key_arg int) RETURNS TABLE(key int, value int) AS $$ @@ -360,15 +356,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT real_time_partition_column_select(1); SELECT real_time_partition_column_select(2); SELECT real_time_partition_column_select(3); SELECT real_time_partition_column_select(4); SELECT real_time_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_partition_column_select(6); +SELECT real_time_partition_column_select(6); -- check task-tracker executor SET citus.task_executor_type TO 'task-tracker'; @@ -391,15 +385,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT task_tracker_non_partition_column_select(10); SELECT task_tracker_non_partition_column_select(20); SELECT task_tracker_non_partition_column_select(30); SELECT task_tracker_non_partition_column_select(40); SELECT task_tracker_non_partition_column_select(50); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT real_time_non_partition_column_select(60); +SELECT real_time_non_partition_column_select(60); CREATE FUNCTION task_tracker_partition_column_select(key_arg int) RETURNS TABLE(key int, value int) AS $$ @@ -420,15 +412,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT task_tracker_partition_column_select(1); SELECT task_tracker_partition_column_select(2); SELECT task_tracker_partition_column_select(3); SELECT task_tracker_partition_column_select(4); SELECT task_tracker_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- SELECT task_tracker_partition_column_select(6); +SELECT task_tracker_partition_column_select(6); SET citus.task_executor_type TO 'real-time'; @@ -445,8 +435,7 @@ SELECT partition_parameter_update(2, 21); SELECT partition_parameter_update(3, 31); SELECT partition_parameter_update(4, 41); SELECT partition_parameter_update(5, 51); --- This fails with an unexpected error message -SELECT partition_parameter_update(5, 52); +SELECT partition_parameter_update(6, 61); CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$ BEGIN @@ -472,13 +461,13 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- execute 6 times to trigger prepared statement usage SELECT partition_parameter_delete(1, 11); SELECT partition_parameter_delete(2, 21); SELECT partition_parameter_delete(3, 31); SELECT partition_parameter_delete(4, 41); SELECT partition_parameter_delete(5, 51); --- This fails with an unexpected error message -SELECT partition_parameter_delete(0, 10); +SELECT partition_parameter_delete(6, 61); CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$ BEGIN diff --git a/src/test/regress/sql/multi_prepare_sql.sql b/src/test/regress/sql/multi_prepare_sql.sql index 8618b8f76..d35c5d936 100644 --- a/src/test/regress/sql/multi_prepare_sql.sql +++ b/src/test/regress/sql/multi_prepare_sql.sql @@ -149,8 +149,7 @@ EXECUTE prepared_test_2; -- execute prepared statements with different parameters EXECUTE prepared_test_6(155); --- FIXME: temporarily disabled --- EXECUTE prepared_test_6(1555); +EXECUTE prepared_test_6(1555); -- test router executor with parameterized non-partition columns @@ -194,12 +193,12 @@ EXECUTE prepared_select(4, 40); EXECUTE prepared_select(5, 50); EXECUTE prepared_select(6, 60); --- test that we don't crash on failing parameterized insert on the partition column +-- Test that parameterized partition column for an insert is supported PREPARE prepared_partition_column_insert(bigint) AS INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); --- we error out on the 6th execution +-- execute 6 times to trigger prepared statement usage EXECUTE prepared_partition_column_insert(1); EXECUTE prepared_partition_column_insert(2); EXECUTE prepared_partition_column_insert(3); @@ -282,10 +281,7 @@ EXECUTE prepared_router_partition_column_select(2); EXECUTE prepared_router_partition_column_select(3); EXECUTE prepared_router_partition_column_select(4); EXECUTE prepared_router_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_router_partition_column_select(6); +EXECUTE prepared_router_partition_column_select(6); PREPARE prepared_router_non_partition_column_select(int) AS SELECT @@ -325,10 +321,7 @@ EXECUTE prepared_real_time_non_partition_column_select(20); EXECUTE prepared_real_time_non_partition_column_select(30); EXECUTE prepared_real_time_non_partition_column_select(40); EXECUTE prepared_real_time_non_partition_column_select(50); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_real_time_non_partition_column_select(60); +EXECUTE prepared_real_time_non_partition_column_select(60); PREPARE prepared_real_time_partition_column_select(int) AS SELECT @@ -348,10 +341,7 @@ EXECUTE prepared_real_time_partition_column_select(2); EXECUTE prepared_real_time_partition_column_select(3); EXECUTE prepared_real_time_partition_column_select(4); EXECUTE prepared_real_time_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_real_time_partition_column_select(6); +EXECUTE prepared_real_time_partition_column_select(6); -- check task-tracker executor SET citus.task_executor_type TO 'task-tracker'; @@ -373,10 +363,7 @@ EXECUTE prepared_task_tracker_non_partition_column_select(20); EXECUTE prepared_task_tracker_non_partition_column_select(30); EXECUTE prepared_task_tracker_non_partition_column_select(40); EXECUTE prepared_task_tracker_non_partition_column_select(50); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_task_tracker_non_partition_column_select(60); +EXECUTE prepared_task_tracker_non_partition_column_select(60); PREPARE prepared_task_tracker_partition_column_select(int) AS SELECT @@ -396,10 +383,7 @@ EXECUTE prepared_task_tracker_partition_column_select(2); EXECUTE prepared_task_tracker_partition_column_select(3); EXECUTE prepared_task_tracker_partition_column_select(4); EXECUTE prepared_task_tracker_partition_column_select(5); - --- FIXME: 6th execution is failing. We don't want to run the failing test --- because of changing output. After implementing this feature, uncomment this. --- EXECUTE prepared_task_tracker_partition_column_select(6); +EXECUTE prepared_task_tracker_partition_column_select(6); SET citus.task_executor_type TO 'real-time'; @@ -413,8 +397,7 @@ EXECUTE prepared_partition_parameter_update(2, 21); EXECUTE prepared_partition_parameter_update(3, 31); EXECUTE prepared_partition_parameter_update(4, 41); EXECUTE prepared_partition_parameter_update(5, 51); --- This fails with an unexpected error message -EXECUTE prepared_partition_parameter_update(5, 52); +EXECUTE prepared_partition_parameter_update(6, 61); PREPARE prepared_non_partition_parameter_update(int, int) AS UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1; @@ -439,8 +422,7 @@ EXECUTE prepared_partition_parameter_delete(2, 21); EXECUTE prepared_partition_parameter_delete(3, 31); EXECUTE prepared_partition_parameter_delete(4, 41); EXECUTE prepared_partition_parameter_delete(5, 51); --- This fails with an unexpected error message -EXECUTE prepared_partition_parameter_delete(0, 10); +EXECUTE prepared_partition_parameter_delete(6, 61); PREPARE prepared_non_partition_parameter_delete(int) AS DELETE FROM prepare_table WHERE key = 0 AND value = $1;