Merge pull request #1134 from citusdata/prepare_hack_poc

Extended Prepared Statement Support
pull/1158/head
Andres Freund 2017-01-23 09:31:33 -08:00 committed by GitHub
commit 6becd43c8d
30 changed files with 931 additions and 434 deletions

View File

@ -52,6 +52,9 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
Job *workerJob = multiPlan->workerJob; Job *workerJob = multiPlan->workerJob;
/* ensure plan is executable */
VerifyMultiPlanValidity(multiPlan);
ExecCheckRTPerms(planStatement->rtable, true); ExecCheckRTPerms(planStatement->rtable, true);
executorType = JobExecutorType(multiPlan); executorType = JobExecutorType(multiPlan);

View File

@ -139,7 +139,11 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
if (modifyQuery->commandType != CMD_UTILITY) if (modifyQuery->commandType != CMD_UTILITY)
{ {
ErrorIfModifyQueryNotSupported(modifyQuery); DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery);
if (error)
{
RaiseDeferredError(error, ERROR);
}
} }
/* reject queries with a returning list */ /* reject queries with a returning list */

View File

@ -167,6 +167,9 @@ MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into,
multiPlan = GetMultiPlan(plan); multiPlan = GetMultiPlan(plan);
/* ensure plan is executable */
VerifyMultiPlanValidity(multiPlan);
if (!ExplainDistributedQueries) if (!ExplainDistributedQueries)
{ {
appendStringInfo(es->str, "explain statements for distributed queries "); appendStringInfo(es->str, "explain statements for distributed queries ");

View File

@ -25,6 +25,7 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planner.h" #include "optimizer/planner.h"
@ -39,12 +40,16 @@ static void CheckNodeIsDumpable(Node *node);
static char * GetMultiPlanString(PlannedStmt *result); static char * GetMultiPlanString(PlannedStmt *result);
static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan); struct MultiPlan *multiPlan);
static struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query, static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan,
Query *originalQuery,
Query *query,
ParamListInfo boundParams,
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CreateAndPushRestrictionContext(void);
static RelationRestrictionContext * CurrentRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void);
static void PopRestrictionContext(void); static void PopRestrictionContext(void);
static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
/* Distributed planner hook */ /* Distributed planner hook */
@ -100,11 +105,8 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse, result = CreateDistributedPlan(result, originalQuery, parse,
restrictionContext); boundParams, restrictionContext);
/* store required data into the planned statement */
result = MultiQueryContainerNode(result, physicalPlan);
} }
} }
PG_CATCH(); PG_CATCH();
@ -122,19 +124,94 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/* /*
* CreatePhysicalPlan encapsulates the logic needed to transform a particular * IsModifyCommand returns true if the query performs modifications, false
* query into a physical plan. For modifications, queries immediately enter * otherwise.
* the physical planning stage, since they are essentially "routed" to remote
* target shards. SELECT queries go through the full logical plan/optimize/
* physical plan process needed to produce distributed query plans.
*/ */
static MultiPlan * bool
CreatePhysicalPlan(Query *originalQuery, Query *query, IsModifyCommand(Query *query)
{
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE || query->hasModifyingCTE)
{
return true;
}
return false;
}
/*
* VerifyMultiPlanValidity verifies that multiPlan is ready for execution, or
* errors out if not.
*
* A plan may e.g. not be ready for execution because CreateDistributedPlan()
* couldn't find a plan due to unresolved prepared statement parameters, but
* didn't error out, because we expect custom plans to come to our rescue.
* But sql (not plpgsql) functions unfortunately don't go through a codepath
* supporting custom plans.
*/
void
VerifyMultiPlanValidity(MultiPlan *multiPlan)
{
if (multiPlan->planningError)
{
RaiseDeferredError(multiPlan->planningError, ERROR);
}
}
/*
* CreateDistributedPlan encapsulates the logic needed to transform a particular
* query into a distributed plan.
*/
static PlannedStmt *
CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query,
ParamListInfo boundParams,
RelationRestrictionContext *restrictionContext) RelationRestrictionContext *restrictionContext)
{ {
MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query, MultiPlan *distributedPlan = NULL;
restrictionContext); PlannedStmt *resultPlan = NULL;
if (physicalPlan == NULL) bool hasUnresolvedParams = false;
if (HasUnresolvedExternParamsWalker((Node *) query, boundParams))
{
hasUnresolvedParams = true;
}
if (IsModifyCommand(query))
{
/* modifications are always routed through the same planner/executor */
distributedPlan = CreateModifyPlan(originalQuery, query, restrictionContext);
Assert(distributedPlan);
}
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planing process needed to
* produce distributed query plans.
*/
if (EnableRouterExecution)
{
distributedPlan = CreateRouterPlan(originalQuery, query, restrictionContext);
/* for debugging it's useful to display why query was not router plannable */
if (distributedPlan && distributedPlan->planningError)
{
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
}
}
/*
* Router didn't yield a plan, try the full distributed planner. As
* real-time/task-tracker don't support prepared statement parameters,
* skip planning in that case (we'll later trigger an error in that
* case if necessary).
*/
if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams)
{ {
/* Create and optimize logical plan */ /* Create and optimize logical plan */
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query); MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query);
@ -150,10 +227,69 @@ CreatePhysicalPlan(Query *originalQuery, Query *query,
CheckNodeIsDumpable((Node *) logicalPlan); CheckNodeIsDumpable((Node *) logicalPlan);
/* Create the physical plan */ /* Create the physical plan */
physicalPlan = MultiPhysicalPlanCreate(logicalPlan); distributedPlan = MultiPhysicalPlanCreate(logicalPlan);
/* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL);
}
} }
return physicalPlan; /*
* If no plan was generated, prepare a generic error to be emitted.
* Normally this error message will never returned to the user, as it's
* usually due to unresolved prepared statement parameters - in that case
* the logic below will force a custom plan (i.e. with parameters bound to
* specific values) to be generated. But sql (not plpgsql) functions
* unfortunately don't go through a codepath supporting custom plans - so
* we still need to have an error prepared.
*/
if (!distributedPlan)
{
/* currently always should have a more specific error otherwise */
Assert(hasUnresolvedParams);
distributedPlan = CitusMakeNode(MultiPlan);
distributedPlan->planningError =
DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"could not create distributed plan",
"Possibly this is caused by the use of parameters in SQL "
"functions, which is not supported in Citus.",
"Consider using PLPGSQL functions instead.");
}
/*
* Error out if none of the planners resulted in a usable plan, unless the
* error was possibly triggered by missing parameters. In that case we'll
* not error out here, but instead rely on postgres' custom plan logic.
* Postgres re-plans prepared statements the first five executions
* (i.e. it produces custom plans), after that the cost of a generic plan
* is compared with the average custom plan cost. We support otherwise
* unsupported prepared statement parameters by assigning an exorbitant
* cost to the unsupported query. That'll lead to the custom plan being
* chosen. But for that to be possible we can't error out here, as
* otherwise that logic is never reached.
*/
if (distributedPlan->planningError && !hasUnresolvedParams)
{
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* store required data into the planned statement */
resultPlan = MultiQueryContainerNode(localPlan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
* query planning failed (possibly) due to prepared statement parameters.
*/
if (distributedPlan->planningError && hasUnresolvedParams)
{
/*
* Arbitraryly high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
resultPlan->planTree->total_cost = FLT_MAX / 100000000;
}
return resultPlan;
} }
@ -405,7 +541,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
* CreateAndPushRestrictionContext creates a new restriction context, inserts it to the * CreateAndPushRestrictionContext creates a new restriction context, inserts it to the
* beginning of the context list, and returns the newly created context. * beginning of the context list, and returns the newly created context.
*/ */
RelationRestrictionContext * static RelationRestrictionContext *
CreateAndPushRestrictionContext(void) CreateAndPushRestrictionContext(void)
{ {
RelationRestrictionContext *restrictionContext = RelationRestrictionContext *restrictionContext =
@ -425,7 +561,7 @@ CreateAndPushRestrictionContext(void)
* CurrentRestrictionContext returns the the last restriction context from the * CurrentRestrictionContext returns the the last restriction context from the
* list. * list.
*/ */
RelationRestrictionContext * static RelationRestrictionContext *
CurrentRestrictionContext(void) CurrentRestrictionContext(void)
{ {
RelationRestrictionContext *restrictionContext = NULL; RelationRestrictionContext *restrictionContext = NULL;
@ -443,8 +579,75 @@ CurrentRestrictionContext(void)
* PopRestrictionContext removes the most recently added restriction context from * PopRestrictionContext removes the most recently added restriction context from
* context list. The function assumes the list is not empty. * context list. The function assumes the list is not empty.
*/ */
void static void
PopRestrictionContext(void) PopRestrictionContext(void)
{ {
relationRestrictionContextList = list_delete_first(relationRestrictionContextList); relationRestrictionContextList = list_delete_first(relationRestrictionContextList);
} }
/*
* HasUnresolvedExternParamsWalker returns true if the passed in expression
* has external parameters that are not contained in boundParams, false
* otherwise.
*/
static bool
HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
{
if (expression == NULL)
{
return false;
}
if (IsA(expression, Param))
{
Param *param = (Param *) expression;
int paramId = param->paramid;
/* only care about user supplied parameters */
if (param->paramkind != PARAM_EXTERN)
{
return false;
}
/* don't care about our special parameter, it'll be removed during planning */
if (paramId == UNINSTANTIATED_PARAMETER_ID)
{
return false;
}
/* check whether parameter is available (and valid) */
if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
{
ParamExternData *externParam = &boundParams->params[paramId - 1];
/* give hook a chance in case parameter is dynamic */
if (!OidIsValid(externParam->ptype) && boundParams->paramFetch != NULL)
{
(*boundParams->paramFetch)(boundParams, paramId);
}
if (OidIsValid(externParam->ptype))
{
return false;
}
}
return true;
}
/* keep traversing */
if (IsA(expression, Query))
{
return query_tree_walker((Query *) expression,
HasUnresolvedExternParamsWalker,
boundParams,
0);
}
else
{
return expression_tree_walker(expression,
HasUnresolvedExternParamsWalker,
boundParams);
}
}

View File

@ -25,6 +25,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/distribution_column.h" #include "distributed/distribution_column.h"
#include "distributed/errormessage.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -74,7 +75,8 @@ typedef struct WalkerState
bool EnableRouterExecution = true; bool EnableRouterExecution = true;
/* planner functions forward declarations */ /* planner functions forward declarations */
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
Query *query,
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery, static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery,
@ -114,58 +116,64 @@ static bool MultiRouterPlannableQuery(Query *query,
static RelationRestrictionContext * CopyRelationRestrictionContext( static RelationRestrictionContext * CopyRelationRestrictionContext(
RelationRestrictionContext *oldContext); RelationRestrictionContext *oldContext);
static Node * InstantiatePartitionQual(Node *node, void *context); static Node * InstantiatePartitionQual(Node *node, void *context);
static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree, static DeferredErrorMessage * InsertSelectQuerySupported(Query *queryTree,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *subqueryRte,
bool allReferenceTables); bool allReferenceTables);
static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query); static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *
subqueryRte,
Oid * Oid *
selectPartitionColumnTableId); selectPartitionColumnTableId);
static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar); static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar);
static void ErrorIfQueryHasModifyingCTE(Query *queryTree); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
/* /*
* MultiRouterPlanCreate creates a multi plan for the queries * CreateRouterPlan attempts to create a router executor plan for the given
* that includes the following: * SELECT statement. If planning fails either NULL is returned, or
* (i) modification queries that hit a single shard * ->planningError is set to a description of the failure.
* (ii) select queries hat can be executed on a single worker
* node and does not require any operations on the master node.
* (iii) INSERT INTO .... SELECT queries
*
* The function returns NULL if it cannot create the plan for SELECT
* queries and errors out if it cannot plan the modify queries.
*/ */
MultiPlan * MultiPlan *
MultiRouterPlanCreate(Query *originalQuery, Query *query, CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext) RelationRestrictionContext *restrictionContext)
{ {
MultiPlan *multiPlan = NULL; Assert(EnableRouterExecution);
bool routerPlannable = MultiRouterPlannableQuery(query, restrictionContext); if (MultiRouterPlannableQuery(query, restrictionContext))
if (!routerPlannable)
{ {
return CreateSingleTaskRouterPlan(originalQuery, query,
restrictionContext);
}
/*
* TODO: Instead have MultiRouterPlannableQuery set an error describing
* why router cannot support the query.
*/
return NULL; return NULL;
} }
/*
* CreateModifyPlan attempts to create a plan the given modification
* statement. If planning fails ->planningError is set to a description of
* the failure.
*/
MultiPlan *
CreateModifyPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
if (InsertSelectQuery(originalQuery)) if (InsertSelectQuery(originalQuery))
{ {
multiPlan = CreateInsertSelectRouterPlan(originalQuery, restrictionContext); return CreateInsertSelectRouterPlan(originalQuery, restrictionContext);
} }
else else
{ {
multiPlan = CreateSingleTaskRouterPlan(originalQuery, query, restrictionContext); return CreateSingleTaskRouterPlan(originalQuery, query,
restrictionContext);
} }
/* plans created by router planner are always router executable */
if (multiPlan != NULL)
{
multiPlan->routerExecutable = true;
}
return multiPlan;
} }
@ -173,8 +181,8 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns * either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete) * query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then the function * are router plannable by default. If query is not router plannable then either NULL is
* returns NULL. * returned, or the returned plan has planningError set to a description of the problem.
*/ */
static MultiPlan * static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
@ -185,7 +193,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
Job *job = NULL; Job *job = NULL;
Task *task = NULL; Task *task = NULL;
List *placementList = NIL; List *placementList = NIL;
MultiPlan *multiPlan = NULL; MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
@ -195,12 +203,23 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
if (modifyTask) if (modifyTask)
{ {
ErrorIfModifyQueryNotSupported(query); /* FIXME: this should probably rather be inlined into CreateModifyPlan */
multiPlan->planningError = ModifyQuerySupported(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterModifyTask(originalQuery, query); task = RouterModifyTask(originalQuery, query);
Assert(task);
} }
else else
{ {
ErrorIfQueryHasModifyingCTE(query); /* FIXME: this should probably rather be inlined into CreateSelectPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError)
{
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList); task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
} }
@ -213,10 +232,10 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
job = RouterQueryJob(originalQuery, task, placementList); job = RouterQueryJob(originalQuery, task, placementList);
multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = job; multiPlan->workerJob = job;
multiPlan->masterQuery = NULL; multiPlan->masterQuery = NULL;
multiPlan->masterTableName = NULL; multiPlan->masterTableName = NULL;
multiPlan->routerExecutable = true;
return multiPlan; return multiPlan;
} }
@ -237,7 +256,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
Job *workerJob = NULL; Job *workerJob = NULL;
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
MultiPlan *multiPlan = NULL; MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery);
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
@ -249,8 +268,13 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
* Error semantics for INSERT ... SELECT queries are different than regular * Error semantics for INSERT ... SELECT queries are different than regular
* modify queries. Thus, handle separately. * modify queries. Thus, handle separately.
*/ */
ErrorIfInsertSelectQueryNotSupported(originalQuery, insertRte, subqueryRte, multiPlan->planningError = InsertSelectQuerySupported(originalQuery, insertRte,
subqueryRte,
allReferenceTables); allReferenceTables);
if (multiPlan->planningError)
{
return multiPlan;
}
/* /*
* Plan select query for each shard in the target table. Do so by replacing the * Plan select query for each shard in the target table. Do so by replacing the
@ -291,10 +315,10 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
/* and finally the multi plan */ /* and finally the multi plan */
multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = workerJob; multiPlan->workerJob = workerJob;
multiPlan->masterTableName = NULL; multiPlan->masterTableName = NULL;
multiPlan->masterQuery = NULL; multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
return multiPlan; return multiPlan;
} }
@ -628,11 +652,11 @@ ExtractInsertRangeTableEntry(Query *query)
/* /*
* ErrorIfInsertSelectQueryNotSupported errors out for unsupported * InsertSelectQueryNotSupported returns NULL if the INSERT ... SELECT query
* INSERT ... SELECT queries. * is supported, or a description why not.
*/ */
static void static DeferredErrorMessage *
ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, InsertSelectQuerySupported(Query *queryTree, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, bool allReferenceTables) RangeTblEntry *subqueryRte, bool allReferenceTables)
{ {
Query *subquery = NULL; Query *subquery = NULL;
@ -640,6 +664,7 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
char targetPartitionMethod = PartitionMethod(targetRelationId); char targetPartitionMethod = PartitionMethod(targetRelationId);
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
DeferredErrorMessage *error = NULL;
/* we only do this check for INSERT ... SELECT queries */ /* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree)); AssertArg(InsertSelectQuery(queryTree));
@ -653,8 +678,9 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
if (rangeTableEntry->rtekind == RTE_RELATION && if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_VIEW) rangeTableEntry->relkind == RELKIND_VIEW)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot insert into view over distributed table"))); "cannot insert into view over distributed table",
NULL, NULL);
} }
} }
@ -662,15 +688,18 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
if (contain_volatile_functions((Node *) queryTree)) if (contain_volatile_functions((Node *) queryTree))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "volatile functions are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("Volatile functions are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* we don't support LIMIT, OFFSET and WINDOW functions */ /* we don't support LIMIT, OFFSET and WINDOW functions */
ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery); error = MultiTaskRouterSelectQuerySupported(subquery);
if (error)
{
return error;
}
/* /*
* If we're inserting into a reference table, all participating tables * If we're inserting into a reference table, all participating tables
@ -680,18 +709,23 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
{ {
if (!allReferenceTables) if (!allReferenceTables)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("If data inserted into a reference table, " "only reference tables may be queried when targeting "
"all of the participating tables in the " "a reference table with INSERT ... SELECT",
"INSERT INTO ... SELECT query should be " NULL, NULL);
"reference tables.")));
} }
} }
else else
{ {
DeferredErrorMessage *error = NULL;
/* ensure that INSERT's partition column comes from SELECT's partition column */ /* ensure that INSERT's partition column comes from SELECT's partition column */
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree, insertRte, subqueryRte, error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId); &selectPartitionColumnTableId);
if (error)
{
return error;
}
/* /*
* We expect partition column values come from colocated tables. Note that we * We expect partition column values come from colocated tables. Note that we
@ -700,22 +734,23 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
*/ */
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId)) if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("INSERT target table and the source relation " "INSERT target table and the source relation of the SELECT partition "
"of the SELECT partition column value " "column value must be colocated",
"must be colocated"))); NULL, NULL);
} }
} }
return NULL;
} }
/* /*
* ErrorUnsupportedMultiTaskSelectQuery errors out on queries that we support * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
* for single task router queries, but, cannot allow for multi task router * as the source for an INSERT ... SELECT or returns a description why not.
* queries. We do these checks recursively to prevent any wrong results.
*/ */
static void static DeferredErrorMessage *
ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query) MultiTaskRouterSelectQuerySupported(Query *query)
{ {
List *queryList = NIL; List *queryList = NIL;
ListCell *queryCell = NULL; ListCell *queryCell = NULL;
@ -730,21 +765,19 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query)
/* pushing down limit per shard would yield wrong results */ /* pushing down limit per shard would yield wrong results */
if (subquery->limitCount != NULL) if (subquery->limitCount != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "LIMIT clauses are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("LIMIT clauses are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* pushing down limit offest per shard would yield wrong results */ /* pushing down limit offest per shard would yield wrong results */
if (subquery->limitOffset != NULL) if (subquery->limitOffset != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "OFFSET clauses are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("OFFSET clauses are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* /*
@ -754,21 +787,19 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query)
*/ */
if (subquery->windowClause != NULL) if (subquery->windowClause != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "window functions are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("Window functions are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* see comment on AddUninstantiatedPartitionRestriction() */ /* see comment on AddUninstantiatedPartitionRestriction() */
if (subquery->setOperations != NULL) if (subquery->setOperations != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "set operations are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("Set operations are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* /*
@ -779,11 +810,10 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query)
*/ */
if (subquery->groupingSets != NULL) if (subquery->groupingSets != NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "grouping sets are not allowed in INSERT ... SELECT "
"modification"), "queries",
errdetail("Grouping sets are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
/* /*
@ -792,25 +822,27 @@ ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query)
*/ */
if (subquery->hasDistinctOn) if (subquery->hasDistinctOn)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "DISTINCT ON clauses are not allowed in "
"modification"), "INSERT ... SELECT queries",
errdetail("DISTINCT ON clauses are not allowed in " NULL, NULL);
"INSERT ... SELECT queries")));
} }
} }
return NULL;
} }
/* /*
* ErrorIfInsertPartitionColumnDoesNotMatchSelect checks whether the INSERTed table's * InsertPartitionColumnMatchesSelect returns NULL the partition column in the
* partition column value matches with the any of the SELECTed table's partition column. * table targeted by INSERTed matches with the any of the SELECTed table's
* partition column. Returns the error description if there's no match.
* *
* On return without error (i.e., if partition columns match), the function also sets * On return without error (i.e., if partition columns match), the function
* selectPartitionColumnTableId. * also sets selectPartitionColumnTableId.
*/ */
static void static DeferredErrorMessage *
ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *insertRte, InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, RangeTblEntry *subqueryRte,
Oid *selectPartitionColumnTableId) Oid *selectPartitionColumnTableId)
{ {
@ -972,14 +1004,13 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
} }
} }
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot plan INSERT INTO ... SELECT " "INSERT INTO ... SELECT partition columns in the source "
"because partition columns in the target table " "table and subquery do not match",
"and the subquery do not match"), psprintf(errorDetailTemplate, exprDescription),
errdetail(errorDetailTemplate, exprDescription), "Ensure the target table's partition column has a "
errhint("Ensure the target table's partition column has a "
"corresponding simple column reference to a distributed " "corresponding simple column reference to a distributed "
"table's partition column in the subquery."))); "table's partition column in the subquery.");
} }
/* /*
@ -988,27 +1019,24 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
*/ */
if (!IsA(targetEntry->expr, Var)) if (!IsA(targetEntry->expr, Var))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot plan INSERT INTO ... SELECT " "INSERT INTO ... SELECT partition columns in the source "
"because partition columns in the target table " "table and subquery do not match",
"and the subquery do not match"),
errdetail(
"The data type of the target table's partition column " "The data type of the target table's partition column "
"should exactly match the data type of the " "should exactly match the data type of the "
"corresponding simple column reference in the subquery."))); "corresponding simple column reference in the subquery.",
NULL);
} }
/* finally, check that the select target column is a partition column */ /* finally, check that the select target column is a partition column */
if (!IsPartitionColumn(selectTargetExpr, subquery)) if (!IsPartitionColumn(selectTargetExpr, subquery))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot plan INSERT INTO ... SELECT " "INSERT INTO ... SELECT partition columns in the source "
"because partition columns in the target table " "table and subquery do not match",
"and the subquery do not match"), "The target table's partition column should correspond "
errdetail( "to a partition column in the subquery.",
"The target table's partition column " NULL);
"should correspond to a partition column "
"in the subquery.")));
} }
/* we can set the select relation id */ /* we can set the select relation id */
@ -1019,11 +1047,15 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
if (!targetTableHasPartitionColumn) if (!targetTableHasPartitionColumn)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot plan INSERT INTO ... SELECT " "INSERT INTO ... SELECT partition columns in the source "
"because the query doesn't include the target table's " "table and subquery do not match",
"partition column"))); "the query doesn't include the target table's "
"partition column",
NULL);
} }
return NULL;
} }
@ -1155,11 +1187,11 @@ AddUninstantiatedEqualityQual(Query *query, Var *partitionColumn)
/* /*
* ErrorIfModifyQueryNotSupported checks if the query contains unsupported features, * ModifyQuerySupported returns NULL if the query only contains supported
* and errors out if it does. * features, otherwise it returns an error description.
*/ */
void DeferredErrorMessage *
ErrorIfModifyQueryNotSupported(Query *queryTree) ModifyQuerySupported(Query *queryTree)
{ {
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
uint32 rangeTableId = 1; uint32 rangeTableId = 1;
@ -1185,21 +1217,18 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
*/ */
if (queryTree->hasSubLinks == true) if (queryTree->hasSubLinks == true)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "subqueries are not supported in distributed modifications",
" modification"), NULL, NULL);
errdetail("Subqueries are not supported in distributed"
" modifications.")));
} }
/* reject queries which include CommonTableExpr */ /* reject queries which include CommonTableExpr */
if (queryTree->cteList != NIL) if (queryTree->cteList != NIL)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "common table expressions are not supported in distributed "
" modification"), "modifications",
errdetail("Common table expressions are not supported in" NULL, NULL);
" distributed modifications.")));
} }
/* extract range table entries */ /* extract range table entries */
@ -1227,11 +1256,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (referenceTable && !schemaNode) if (referenceTable && !schemaNode)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "cannot perform distributed planning for the given"
" modification"), " modification",
errdetail("Modifications to reference tables are " "Modifications to reference tables are "
"supported only from the schema node."))); "supported only from the schema node.",
NULL);
} }
queryTableCount++; queryTableCount++;
@ -1239,8 +1269,9 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
/* we do not expect to see a view in modify query */ /* we do not expect to see a view in modify query */
if (rangeTableEntry->relkind == RELKIND_VIEW) if (rangeTableEntry->relkind == RELKIND_VIEW)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot modify views over distributed tables"))); "cannot modify views over distributed tables",
NULL, NULL);
} }
} }
else if (rangeTableEntry->rtekind == RTE_VALUES) else if (rangeTableEntry->rtekind == RTE_VALUES)
@ -1277,10 +1308,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
rangeTableEntryErrorDetail = "Unrecognized range table entry."; rangeTableEntryErrorDetail = "Unrecognized range table entry.";
} }
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "cannot perform distributed planning for the given "
" modifications"), "modifications",
errdetail("%s", rangeTableEntryErrorDetail))); rangeTableEntryErrorDetail,
NULL);
} }
} }
@ -1291,11 +1323,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
*/ */
if (commandType != CMD_INSERT && queryTableCount != 1) if (commandType != CMD_INSERT && queryTableCount != 1)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "cannot perform distributed planning for the given"
" modification"), " modification",
errdetail("Joins are not supported in distributed " "Joins are not supported in distributed "
"modifications."))); "modifications.",
NULL);
} }
/* reject queries which involve multi-row inserts */ /* reject queries which involve multi-row inserts */
@ -1308,11 +1341,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
* with a constant, and if you're inserting multiple rows at once the function * with a constant, and if you're inserting multiple rows at once the function
* should return a different value for each row. * should return a different value for each row.
*/ */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given" "cannot perform distributed planning for the given"
" modification"), " modification",
errdetail("Multi-row INSERTs to distributed tables are not " "Multi-row INSERTs to distributed tables are not "
"supported."))); "supported.",
NULL);
} }
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
@ -1347,9 +1381,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (commandType == CMD_UPDATE && if (commandType == CMD_UPDATE &&
contain_volatile_functions((Node *) targetEntry->expr)) contain_volatile_functions((Node *) targetEntry->expr))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("functions used in UPDATE queries on distributed " "functions used in UPDATE queries on distributed "
"tables must not be VOLATILE"))); "tables must not be VOLATILE",
NULL, NULL);
} }
if (commandType == CMD_UPDATE && targetEntryPartitionColumn && if (commandType == CMD_UPDATE && targetEntryPartitionColumn &&
@ -1362,9 +1397,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (commandType == CMD_INSERT && targetEntryPartitionColumn && if (commandType == CMD_INSERT && targetEntryPartitionColumn &&
!IsA(targetEntry->expr, Const)) !IsA(targetEntry->expr, Const))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("values given for the partition column must be" "values given for the partition column must be"
" constants or constant expressions"))); " constants or constant expressions",
NULL, NULL);
} }
if (commandType == CMD_UPDATE && if (commandType == CMD_UPDATE &&
@ -1379,10 +1415,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
{ {
if (contain_volatile_functions(joinTree->quals)) if (contain_volatile_functions(joinTree->quals))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("functions used in the WHERE clause of " "functions used in the WHERE clause of modification "
"modification queries on distributed tables " "queries on distributed tables must not be VOLATILE",
"must not be VOLATILE"))); NULL, NULL);
} }
else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument,
&hasBadCoalesce)) &hasBadCoalesce))
@ -1393,23 +1429,26 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (hasVarArgument) if (hasVarArgument)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("STABLE functions used in UPDATE queries" "STABLE functions used in UPDATE queries "
" cannot be called with column references"))); "cannot be called with column references",
NULL, NULL);
} }
if (hasBadCoalesce) if (hasBadCoalesce)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("non-IMMUTABLE functions are not allowed in CASE or" "non-IMMUTABLE functions are not allowed in CASE or "
" COALESCE statements"))); "COALESCE statements",
NULL, NULL);
} }
if (contain_mutable_functions((Node *) queryTree->returningList)) if (contain_mutable_functions((Node *) queryTree->returningList))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("non-IMMUTABLE functions are not allowed in the" "non-IMMUTABLE functions are not allowed in the "
" RETURNING clause"))); "RETURNING clause",
NULL, NULL);
} }
} }
@ -1470,10 +1509,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
} }
else if (contain_mutable_functions((Node *) setTargetEntry->expr)) else if (contain_mutable_functions((Node *) setTargetEntry->expr))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("functions used in the DO UPDATE SET clause of " "functions used in the DO UPDATE SET clause of "
"INSERTs on distributed tables must be marked " "INSERTs on distributed tables must be marked "
"IMMUTABLE"))); "IMMUTABLE",
NULL, NULL);
} }
} }
} }
@ -1482,17 +1522,22 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (contain_mutable_functions((Node *) arbiterWhere) || if (contain_mutable_functions((Node *) arbiterWhere) ||
contain_mutable_functions((Node *) onConflictWhere)) contain_mutable_functions((Node *) onConflictWhere))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("functions used in the WHERE clause of the ON CONFLICT " "functions used in the WHERE clause of the "
"clause of INSERTs on distributed tables must be marked " "ON CONFLICT clause of INSERTs on distributed "
"IMMUTABLE"))); "tables must be marked IMMUTABLE",
NULL, NULL);
} }
if (specifiesPartitionValue) if (specifiesPartitionValue)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("modifying the partition value of rows is not allowed"))); "modifying the partition value of rows is not "
"allowed",
NULL, NULL);
} }
return NULL;
} }
@ -2579,7 +2624,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
* the same node. Router plannable checks for select queries can be turned off * the same node. Router plannable checks for select queries can be turned off
* by setting citus.enable_router_execution flag to false. * by setting citus.enable_router_execution flag to false.
*/ */
bool static bool
MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext) MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext)
{ {
CmdType commandType = query->commandType; CmdType commandType = query->commandType;
@ -3021,19 +3066,13 @@ InstantiatePartitionQual(Node *node, void *context)
* ErrorIfQueryHasModifyingCTE checks if the query contains modifying common table * ErrorIfQueryHasModifyingCTE checks if the query contains modifying common table
* expressions and errors out if it does. * expressions and errors out if it does.
*/ */
static void static DeferredErrorMessage *
ErrorIfQueryHasModifyingCTE(Query *queryTree) ErrorIfQueryHasModifyingCTE(Query *queryTree)
{ {
ListCell *cteCell = NULL; ListCell *cteCell = NULL;
Assert(queryTree->commandType == CMD_SELECT); Assert(queryTree->commandType == CMD_SELECT);
/* we do not need to do anything if there are no CTEs */
if (queryTree->cteList == NIL)
{
return;
}
foreach(cteCell, queryTree->cteList) foreach(cteCell, queryTree->cteList)
{ {
CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
@ -3047,11 +3086,13 @@ ErrorIfQueryHasModifyingCTE(Query *queryTree)
*/ */
if (cteQuery->commandType != CMD_SELECT) if (cteQuery->commandType != CMD_SELECT)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errmsg("cannot perform distributed planning for the given " "data-modifying statements are not supported in "
"modification"), "the WITH clauses of distributed queries",
errdetail("Data-modifying statements are not supported in " NULL, NULL);
"the WITH clauses of distributed queries.")));
} }
} }
/* everything OK */
return NULL;
} }

View File

@ -13,6 +13,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/errormessage.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -33,7 +34,8 @@ static const char *CitusNodeTagNamesD[] = {
"Task", "Task",
"ShardInterval", "ShardInterval",
"ShardPlacement", "ShardPlacement",
"RelationShard" "RelationShard",
"DeferredErrorMessage"
}; };
const char **CitusNodeTagNames = CitusNodeTagNamesD; const char **CitusNodeTagNames = CitusNodeTagNamesD;
@ -383,6 +385,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(ShardPlacement), DEFINE_NODE_METHODS(ShardPlacement),
DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(Task), DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(DeferredErrorMessage),
/* nodes with only output support */ /* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode), DEFINE_NODE_METHODS_NO_READ(MultiNode),

View File

@ -22,6 +22,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/errormessage.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -279,6 +280,7 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_STRING_FIELD(masterTableName); WRITE_STRING_FIELD(masterTableName);
WRITE_BOOL_FIELD(routerExecutable); WRITE_BOOL_FIELD(routerExecutable);
WRITE_NODE_FIELD(planningError);
} }
@ -515,6 +517,23 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationShardList);
} }
void
OutDeferredErrorMessage(OUTFUNC_ARGS)
{
WRITE_LOCALS(DeferredErrorMessage);
WRITE_NODE_TYPE("DEFERREDERRORMESSAGE");
WRITE_INT_FIELD(code);
WRITE_STRING_FIELD(message);
WRITE_STRING_FIELD(detail);
WRITE_STRING_FIELD(hint);
WRITE_STRING_FIELD(filename);
WRITE_INT_FIELD(linenumber);
WRITE_STRING_FIELD(functionname);
}
#if (PG_VERSION_NUM < 90600) #if (PG_VERSION_NUM < 90600)
/* /*
@ -635,6 +654,12 @@ outNode(StringInfo str, const void *obj)
appendStringInfoChar(str, '}'); appendStringInfoChar(str, '}');
break; break;
case T_DeferredErrorMessage:
appendStringInfoChar(str, '{');
OutDeferredErrorMessage(str, obj);
appendStringInfoChar(str, '}');
break;
default: default:
/* fall back into postgres' normal nodeToString machinery */ /* fall back into postgres' normal nodeToString machinery */
appendStringInfoString(str, nodeToString(obj)); appendStringInfoString(str, nodeToString(obj));

View File

@ -14,6 +14,7 @@
#include <math.h> #include <math.h>
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/errormessage.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/readfuncs.h" #include "nodes/readfuncs.h"
@ -186,6 +187,7 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName); READ_STRING_FIELD(masterTableName);
READ_BOOL_FIELD(routerExecutable); READ_BOOL_FIELD(routerExecutable);
READ_NODE_FIELD(planningError);
READ_DONE(); READ_DONE();
} }
@ -314,6 +316,24 @@ ReadTask(READFUNC_ARGS)
READ_DONE(); READ_DONE();
} }
READFUNC_RET
ReadDeferredErrorMessage(READFUNC_ARGS)
{
READ_LOCALS(DeferredErrorMessage);
READ_INT_FIELD(code);
READ_STRING_FIELD(message);
READ_STRING_FIELD(detail);
READ_STRING_FIELD(hint);
READ_STRING_FIELD(filename);
READ_INT_FIELD(linenumber);
READ_STRING_FIELD(functionname);
READ_DONE();
}
READFUNC_RET READFUNC_RET
ReadUnsupportedCitusNode(READFUNC_ARGS) ReadUnsupportedCitusNode(READFUNC_ARGS)
{ {

View File

@ -1519,6 +1519,8 @@ CitusParseNodeString(void)
return_value = ReadRelationShard(); return_value = ReadRelationShard();
else if (MATCH("TASK", 4)) else if (MATCH("TASK", 4))
return_value = ReadTask(); return_value = ReadTask();
else if (MATCH("DEFERREDERRORMESSAGE", 20))
return_value = ReadDeferredErrorMessage();
/* XXX: END Citus Nodes */ /* XXX: END Citus Nodes */
else else
{ {

View File

@ -0,0 +1,58 @@
/*
* errormessage.c
* Error handling related support functionality.
*
* Copyright (c) 2017, Citus Data, Inc.
*/
#include "postgres.h"
#include "distributed/citus_nodes.h"
#include "distributed/errormessage.h"
/*
* DeferredErrorInternal is a helper function for DeferredError().
*/
DeferredErrorMessage *
DeferredErrorInternal(int code, const char *message, const char *detail, const char *hint,
const char *filename, int linenumber, const char *functionname)
{
DeferredErrorMessage *error = CitusMakeNode(DeferredErrorMessage);
error->code = code;
error->message = message;
error->detail = detail;
error->hint = hint;
error->filename = filename;
error->linenumber = linenumber;
error->functionname = functionname;
return error;
}
/*
* RaiseDeferredErrorInternal is a helper function for RaiseDeferredError().
*/
void
RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel)
{
ErrorData *errorData = palloc0(sizeof(ErrorData));
errorData->sqlerrcode = error->code;
errorData->elevel = elevel;
errorData->message = pstrdup(error->message);
if (error->detail)
{
errorData->detail = pstrdup(error->detail);
}
if (error->hint)
{
errorData->hint = pstrdup(error->hint);
}
errorData->filename = pstrdup(error->filename);
errorData->lineno = error->linenumber;
errorData->funcname = error->functionname;
ThrowErrorData(errorData);
}

View File

@ -68,6 +68,7 @@ extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
@ -78,6 +79,7 @@ extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS); extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutMultiNode(OUTFUNC_ARGS); extern void OutMultiNode(OUTFUNC_ARGS);
extern void OutMultiTreeRoot(OUTFUNC_ARGS); extern void OutMultiTreeRoot(OUTFUNC_ARGS);

View File

@ -56,7 +56,8 @@ typedef enum CitusNodeTag
T_Task, T_Task,
T_ShardInterval, T_ShardInterval,
T_ShardPlacement, T_ShardPlacement,
T_RelationShard T_RelationShard,
T_DeferredErrorMessage
} CitusNodeTag; } CitusNodeTag;
@ -99,6 +100,8 @@ CitusNodeTagI(Node *node)
#else #else
#include "nodes/nodes.h"
typedef CitusNodeTag CitusNode; typedef CitusNodeTag CitusNode;
/* /*
* nodeTag equivalent that returns the node tag for both citus and postgres * nodeTag equivalent that returns the node tag for both citus and postgres

View File

@ -0,0 +1,60 @@
/*-------------------------------------------------------------------------
*
* errormessage.h
* Error handling related support functionality.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef ERRORMESSAGE_H
#define ERRORMESSAGE_H
#include "distributed/citus_nodes.h"
typedef struct DeferredErrorMessage
{
CitusNode tag;
int code;
const char *message;
const char *detail;
const char *hint;
const char *filename;
int linenumber;
const char *functionname;
} DeferredErrorMessage;
/*
* DeferredError allocates a deferred error message, that can later be emitted
* using RaiseDeferredError(). These error messages can be
* serialized/copied/deserialized, i.e. can be embedded in plans and such.
*/
#define DeferredError(code, message, detail, hint) \
DeferredErrorInternal(code, message, detail, hint, __FILE__, __LINE__, __func__)
DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, const
char *detail, const char *hint,
const char *filename, int linenumber, const
char *functionname);
/*
* RaiseDeferredError emits a previously allocated error using the specified
* severity.
*
* The trickery with __builtin_constant_p/pg_unreachable aims to have the
* compiler understand that the function will not return if elevel >= ERROR.
*/
#define RaiseDeferredError(error, elevel) \
do { \
RaiseDeferredErrorInternal(error, elevel); \
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
pg_unreachable(); } \
} while (0)
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);
#endif

View File

@ -19,6 +19,7 @@
#include "datatype/timestamp.h" #include "datatype/timestamp.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/errormessage.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -215,6 +216,13 @@ typedef struct MultiPlan
Query *masterQuery; Query *masterQuery;
char *masterTableName; char *masterTableName;
bool routerExecutable; bool routerExecutable;
/*
* NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support,
* or if prepared statement parameters prevented successful planning.
*/
DeferredErrorMessage *planningError;
} MultiPlan; } MultiPlan;

View File

@ -56,5 +56,7 @@ struct MultiPlan;
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte); Index index, RangeTblEntry *rte);
extern bool IsModifyCommand(Query *query);
extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan);
#endif /* MULTI_PLANNER_H */ #endif /* MULTI_PLANNER_H */

View File

@ -14,6 +14,7 @@
#include "c.h" #include "c.h"
#include "distributed/errormessage.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -28,10 +29,13 @@
extern bool EnableRouterExecution; extern bool EnableRouterExecution;
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext);
extern void AddUninstantiatedPartitionRestriction(Query *originalQuery); extern void AddUninstantiatedPartitionRestriction(Query *originalQuery);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte); RangeTblEntry *subqueryRte);

View File

@ -727,3 +727,10 @@ Distributed Query into pg_merge_job_570039
Master Query Master Query
-> Aggregate -> Aggregate
-> Seq Scan on pg_merge_job_570039 -> Seq Scan on pg_merge_job_570039
-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PLPGSQL functions instead.

View File

@ -698,3 +698,10 @@ Distributed Query into pg_merge_job_570039
Master Query Master Query
-> Aggregate -> Aggregate
-> Seq Scan on pg_merge_job_570039 -> Seq Scan on pg_merge_job_570039
-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PLPGSQL functions instead.

View File

@ -177,16 +177,14 @@ SELECT
user_id, (random()*10)::int user_id, (random()*10)::int
FROM FROM
raw_events_first; raw_events_first;
ERROR: cannot perform distributed planning for the given modification ERROR: volatile functions are not allowed in INSERT ... SELECT queries
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
INSERT INTO raw_events_second (user_id, value_1) INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT (random()*10)::int) WITH sub_cte AS (SELECT (random()*10)::int)
SELECT SELECT
user_id, (SELECT * FROM sub_cte) user_id, (SELECT * FROM sub_cte)
FROM FROM
raw_events_first; raw_events_first;
ERROR: cannot perform distributed planning for the given modification ERROR: volatile functions are not allowed in INSERT ... SELECT queries
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
-- add one more row -- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES INSERT INTO raw_events_first (user_id, time) VALUES
(7, now()); (7, now());
@ -1098,8 +1096,7 @@ INSERT INTO agg_events (value_1_agg, user_id)
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification ERROR: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries
DETAIL: DISTINCT ON clauses are not allowed in INSERT ... SELECT queries
-- We do not support some CTEs -- We do not support some CTEs
WITH fist_table_agg AS WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id) (SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)
@ -1112,7 +1109,7 @@ INSERT INTO agg_events
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
-- We do support some CTEs -- We do support some CTEs
INSERT INTO agg_events INSERT INTO agg_events
@ -1156,8 +1153,7 @@ FROM
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification ERROR: set operations are not allowed in INSERT ... SELECT queries
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- We do not support any set operations -- We do not support any set operations
INSERT INTO INSERT INTO
raw_events_first(user_id) raw_events_first(user_id)
@ -1166,8 +1162,7 @@ INSERT INTO
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification ERROR: set operations are not allowed in INSERT ... SELECT queries
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- We do not support any set operations -- We do not support any set operations
INSERT INTO INSERT INTO
raw_events_first(user_id) raw_events_first(user_id)
@ -1179,8 +1174,7 @@ FROM
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification ERROR: set operations are not allowed in INSERT ... SELECT queries
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- unsupported JOIN -- unsupported JOIN
INSERT INTO agg_events INSERT INTO agg_events
(value_4_agg, (value_4_agg,
@ -1219,7 +1213,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
-- error cases -- error cases
-- no part column at all -- no part column at all
@ -1230,7 +1224,8 @@ FROM raw_events_first;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because the query doesn't include the target table's partition column ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: the query doesn't include the target table's partition column
INSERT INTO raw_events_second INSERT INTO raw_events_second
(value_1) (value_1)
SELECT user_id SELECT user_id
@ -1238,7 +1233,8 @@ FROM raw_events_first;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because the query doesn't include the target table's partition column ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: the query doesn't include the target table's partition column
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
SELECT value_1 SELECT value_1
@ -1246,7 +1242,7 @@ FROM raw_events_first;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1255,7 +1251,7 @@ FROM raw_events_first;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an operator in the same position as the target table's partition column. DETAIL: Subquery contains an operator in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO raw_events_second INSERT INTO raw_events_second
@ -1265,7 +1261,7 @@ FROM raw_events_first;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column. DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO agg_events INSERT INTO agg_events
@ -1284,7 +1280,7 @@ GROUP BY user_id;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an aggregation in the same position as the target table's partition column. DETAIL: Subquery contains an aggregation in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO agg_events INSERT INTO agg_events
@ -1304,7 +1300,7 @@ GROUP BY user_id,
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
-- tables should be co-located -- tables should be co-located
INSERT INTO agg_events (user_id) INSERT INTO agg_events (user_id)
@ -1315,7 +1311,7 @@ FROM
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
-- unsupported joins between subqueries -- unsupported joins between subqueries
-- we do not return bare partition column on the inner query -- we do not return bare partition column on the inner query
@ -1344,7 +1340,7 @@ ON (f.id = f2.id);
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
-- the second part of the query is not routable since -- the second part of the query is not routable since
@ -1428,8 +1424,7 @@ GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) );
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: StartTransaction DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification ERROR: grouping sets are not allowed in INSERT ... SELECT queries
DETAIL: Grouping sets are not allowed in INSERT ... SELECT queries
-- set back to INFO -- set back to INFO
SET client_min_messages TO INFO; SET client_min_messages TO INFO;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
@ -1999,8 +1994,7 @@ FROM
table_with_defaults table_with_defaults
GROUP BY GROUP BY
store_id; store_id;
ERROR: cannot perform distributed planning for the given modification ERROR: volatile functions are not allowed in INSERT ... SELECT queries
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
-- do some more error/error message checks -- do some more error/error message checks
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -2030,42 +2024,42 @@ INSERT INTO text_table (part_col)
CASE WHEN part_col = 'onder' THEN 'marco' CASE WHEN part_col = 'onder' THEN 'marco'
END END
FROM text_table ; FROM text_table ;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains a case expression in the same position as the target table's partition column. DETAIL: Subquery contains a case expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table; INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column. DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table; INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table; INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table; INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table; INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT part_col::text from char_table; INSERT INTO text_table (part_col) SELECT part_col::text from char_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
INSERT INTO text_table (part_col) SELECT val FROM text_table; INSERT INTO text_table (part_col) SELECT val FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
INSERT INTO text_table (part_col) SELECT val::text FROM text_table; INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults;

View File

@ -216,8 +216,7 @@ DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES; INSERT INTO limit_orders DEFAULT VALUES;
ERROR: cannot perform distributed planning for the given modification ERROR: common table expressions are not supported in distributed modifications
DETAIL: Common table expressions are not supported in distributed modifications.
-- test simple DELETE -- test simple DELETE
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
SELECT COUNT(*) FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246;
@ -275,8 +274,7 @@ ERROR: cannot plan queries that include both regular and partitioned relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders; DELETE FROM limit_orders;
ERROR: cannot perform distributed planning for the given modification ERROR: common table expressions are not supported in distributed modifications
DETAIL: Common table expressions are not supported in distributed modifications.
-- cursors are not supported -- cursors are not supported
DELETE FROM limit_orders WHERE CURRENT OF cursor_name; DELETE FROM limit_orders WHERE CURRENT OF cursor_name;
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
@ -421,8 +419,7 @@ ERROR: cannot plan queries that include both regular and partitioned relations
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
UPDATE limit_orders SET symbol = 'GM'; UPDATE limit_orders SET symbol = 'GM';
ERROR: cannot perform distributed planning for the given modification ERROR: common table expressions are not supported in distributed modifications
DETAIL: Common table expressions are not supported in distributed modifications.
SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
symbol | bidder_id symbol | bidder_id
--------+----------- --------+-----------

View File

@ -265,9 +265,18 @@ SELECT plpgsql_test_2();
(1 row) (1 row)
-- run PL/pgsql functions with different parameters -- run PL/pgsql functions with different parameters
-- FIXME: temporarily disabled, waiting for proper parametrized query support SELECT plpgsql_test_6(155);
-- SELECT plpgsql_test_6(155); plpgsql_test_6
-- SELECT plpgsql_test_6(1555); ----------------
11811
(1 row)
SELECT plpgsql_test_6(1555);
plpgsql_test_6
----------------
10183
(1 row)
-- test router executor parameterized PL/pgsql functions -- test router executor parameterized PL/pgsql functions
CREATE TABLE plpgsql_table ( CREATE TABLE plpgsql_table (
key int, key int,
@ -365,9 +374,11 @@ SELECT single_parameter_insert(5);
(1 row) (1 row)
SELECT single_parameter_insert(6); SELECT single_parameter_insert(6);
ERROR: values given for the partition column must be constants or constant expressions single_parameter_insert
CONTEXT: SQL statement "INSERT INTO plpgsql_table (key) VALUES (key_arg)" -------------------------
PL/pgSQL function single_parameter_insert(integer) line 3 at SQL statement
(1 row)
CREATE FUNCTION double_parameter_insert(key_arg int, value_arg int) CREATE FUNCTION double_parameter_insert(key_arg int, value_arg int)
RETURNS void as $$ RETURNS void as $$
BEGIN BEGIN
@ -406,9 +417,11 @@ SELECT double_parameter_insert(5, 50);
(1 row) (1 row)
SELECT double_parameter_insert(6, 60); SELECT double_parameter_insert(6, 60);
ERROR: values given for the partition column must be constants or constant expressions double_parameter_insert
CONTEXT: SQL statement "INSERT INTO plpgsql_table (key, value) VALUES (key_arg, value_arg)" -------------------------
PL/pgSQL function double_parameter_insert(integer,integer) line 3 at SQL statement
(1 row)
CREATE FUNCTION non_partition_parameter_insert(value_arg int) CREATE FUNCTION non_partition_parameter_insert(value_arg int)
RETURNS void as $$ RETURNS void as $$
BEGIN BEGIN
@ -478,7 +491,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
4 | 4 |
5 | 50 5 | 50
5 | 5 |
(22 rows) 6 | 60
6 |
(24 rows)
-- check router executor select -- check router executor select
CREATE FUNCTION router_partition_column_select(key_arg int) CREATE FUNCTION router_partition_column_select(key_arg int)
@ -498,6 +513,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT router_partition_column_select(1); SELECT router_partition_column_select(1);
router_partition_column_select router_partition_column_select
-------------------------------- --------------------------------
@ -533,9 +549,13 @@ SELECT router_partition_column_select(5);
(5,) (5,)
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test SELECT router_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. router_partition_column_select
-- SELECT router_partition_column_select(6); --------------------------------
(6,60)
(6,)
(2 rows)
CREATE FUNCTION router_non_partition_column_select(value_arg int) CREATE FUNCTION router_non_partition_column_select(value_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
DECLARE DECLARE
@ -554,6 +574,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT router_non_partition_column_select(10); SELECT router_non_partition_column_select(10);
router_non_partition_column_select router_non_partition_column_select
------------------------------------ ------------------------------------
@ -608,6 +629,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT real_time_non_partition_column_select(10); SELECT real_time_non_partition_column_select(10);
real_time_non_partition_column_select real_time_non_partition_column_select
--------------------------------------- ---------------------------------------
@ -643,9 +665,13 @@ SELECT real_time_non_partition_column_select(50);
(5,50) (5,50)
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test SELECT real_time_non_partition_column_select(60);
-- because of changing output. After implementing this feature, uncomment this. real_time_non_partition_column_select
-- SELECT real_time_non_partition_column_select(60); ---------------------------------------
(0,60)
(6,60)
(2 rows)
CREATE FUNCTION real_time_partition_column_select(key_arg int) CREATE FUNCTION real_time_partition_column_select(key_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
DECLARE DECLARE
@ -664,6 +690,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT real_time_partition_column_select(1); SELECT real_time_partition_column_select(1);
real_time_partition_column_select real_time_partition_column_select
----------------------------------- -----------------------------------
@ -708,9 +735,15 @@ SELECT real_time_partition_column_select(5);
(5,) (5,)
(4 rows) (4 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test SELECT real_time_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. real_time_partition_column_select
-- SELECT real_time_partition_column_select(6); -----------------------------------
(0,10)
(1,10)
(6,60)
(6,)
(4 rows)
-- check task-tracker executor -- check task-tracker executor
SET citus.task_executor_type TO 'task-tracker'; SET citus.task_executor_type TO 'task-tracker';
CREATE FUNCTION task_tracker_non_partition_column_select(value_arg int) CREATE FUNCTION task_tracker_non_partition_column_select(value_arg int)
@ -730,6 +763,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT task_tracker_non_partition_column_select(10); SELECT task_tracker_non_partition_column_select(10);
task_tracker_non_partition_column_select task_tracker_non_partition_column_select
------------------------------------------ ------------------------------------------
@ -765,9 +799,13 @@ SELECT task_tracker_non_partition_column_select(50);
(5,50) (5,50)
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test SELECT real_time_non_partition_column_select(60);
-- because of changing output. After implementing this feature, uncomment this. real_time_non_partition_column_select
-- SELECT real_time_non_partition_column_select(60); ---------------------------------------
(0,60)
(6,60)
(2 rows)
CREATE FUNCTION task_tracker_partition_column_select(key_arg int) CREATE FUNCTION task_tracker_partition_column_select(key_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
DECLARE DECLARE
@ -786,6 +824,7 @@ BEGIN
value; value;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT task_tracker_partition_column_select(1); SELECT task_tracker_partition_column_select(1);
task_tracker_partition_column_select task_tracker_partition_column_select
-------------------------------------- --------------------------------------
@ -830,9 +869,15 @@ SELECT task_tracker_partition_column_select(5);
(5,) (5,)
(4 rows) (4 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test SELECT task_tracker_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. task_tracker_partition_column_select
-- SELECT task_tracker_partition_column_select(6); --------------------------------------
(0,10)
(1,10)
(6,60)
(6,)
(4 rows)
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
-- check updates -- check updates
CREATE FUNCTION partition_parameter_update(int, int) RETURNS void as $$ CREATE FUNCTION partition_parameter_update(int, int) RETURNS void as $$
@ -871,8 +916,7 @@ SELECT partition_parameter_update(5, 51);
(1 row) (1 row)
-- This fails with an unexpected error message SELECT partition_parameter_update(6, 61);
SELECT partition_parameter_update(5, 52);
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
DETAIL: This command modifies all shards. DETAIL: This command modifies all shards.
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
@ -946,7 +990,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
4 | 41 4 | 41
5 | 51 5 | 51
5 | 51 5 | 51
(22 rows) 6 | 60
6 |
(24 rows)
-- check deletes -- check deletes
CREATE FUNCTION partition_parameter_delete(int, int) RETURNS void as $$ CREATE FUNCTION partition_parameter_delete(int, int) RETURNS void as $$
@ -954,6 +1000,7 @@ BEGIN
DELETE FROM plpgsql_table WHERE key = $1 AND value = $2; DELETE FROM plpgsql_table WHERE key = $1 AND value = $2;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT partition_parameter_delete(1, 11); SELECT partition_parameter_delete(1, 11);
partition_parameter_delete partition_parameter_delete
---------------------------- ----------------------------
@ -984,8 +1031,7 @@ SELECT partition_parameter_delete(5, 51);
(1 row) (1 row)
-- This fails with an unexpected error message SELECT partition_parameter_delete(6, 61);
SELECT partition_parameter_delete(0, 10);
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
DETAIL: This command modifies all shards. DETAIL: This command modifies all shards.
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
@ -1043,7 +1089,9 @@ SELECT * FROM plpgsql_table ORDER BY key, value;
0 | 0 |
0 | 0 |
0 | 0 |
(6 rows) 6 | 60
6 |
(8 rows)
-- check whether we can handle execute parameters -- check whether we can handle execute parameters
CREATE TABLE execute_parameter_test (key int, val date); CREATE TABLE execute_parameter_test (key int, val date);

View File

@ -240,8 +240,12 @@ EXECUTE prepared_test_6(155);
11811 11811
(1 row) (1 row)
-- FIXME: temporarily disabled EXECUTE prepared_test_6(1555);
-- EXECUTE prepared_test_6(1555); count
-------
10183
(1 row)
-- test router executor with parameterized non-partition columns -- test router executor with parameterized non-partition columns
-- create a custom type which also exists on worker nodes -- create a custom type which also exists on worker nodes
CREATE TYPE test_composite_type AS ( CREATE TYPE test_composite_type AS (
@ -325,17 +329,16 @@ EXECUTE prepared_select(6, 60);
1 1
(1 row) (1 row)
-- test that we don't crash on failing parameterized insert on the partition column -- Test that parameterized partition column for an insert is supported
PREPARE prepared_partition_column_insert(bigint) AS PREPARE prepared_partition_column_insert(bigint) AS
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
-- we error out on the 6th execution -- execute 6 times to trigger prepared statement usage
EXECUTE prepared_partition_column_insert(1); EXECUTE prepared_partition_column_insert(1);
EXECUTE prepared_partition_column_insert(2); EXECUTE prepared_partition_column_insert(2);
EXECUTE prepared_partition_column_insert(3); EXECUTE prepared_partition_column_insert(3);
EXECUTE prepared_partition_column_insert(4); EXECUTE prepared_partition_column_insert(4);
EXECUTE prepared_partition_column_insert(5); EXECUTE prepared_partition_column_insert(5);
EXECUTE prepared_partition_column_insert(6); EXECUTE prepared_partition_column_insert(6);
ERROR: values given for the partition column must be constants or constant expressions
DROP TYPE test_composite_type CASCADE; DROP TYPE test_composite_type CASCADE;
NOTICE: drop cascades to table router_executor_table column stats NOTICE: drop cascades to table router_executor_table column stats
-- test router executor with prepare statements -- test router executor with prepare statements
@ -373,7 +376,6 @@ EXECUTE prepared_single_parameter_insert(3);
EXECUTE prepared_single_parameter_insert(4); EXECUTE prepared_single_parameter_insert(4);
EXECUTE prepared_single_parameter_insert(5); EXECUTE prepared_single_parameter_insert(5);
EXECUTE prepared_single_parameter_insert(6); EXECUTE prepared_single_parameter_insert(6);
ERROR: values given for the partition column must be constants or constant expressions
PREPARE prepared_double_parameter_insert(int, int) AS PREPARE prepared_double_parameter_insert(int, int) AS
INSERT INTO prepare_table (key, value) VALUES ($1, $2); INSERT INTO prepare_table (key, value) VALUES ($1, $2);
-- execute 6 times to trigger prepared statement usage -- execute 6 times to trigger prepared statement usage
@ -383,7 +385,6 @@ EXECUTE prepared_double_parameter_insert(3, 30);
EXECUTE prepared_double_parameter_insert(4, 40); EXECUTE prepared_double_parameter_insert(4, 40);
EXECUTE prepared_double_parameter_insert(5, 50); EXECUTE prepared_double_parameter_insert(5, 50);
EXECUTE prepared_double_parameter_insert(6, 60); EXECUTE prepared_double_parameter_insert(6, 60);
ERROR: values given for the partition column must be constants or constant expressions
PREPARE prepared_non_partition_parameter_insert(int) AS PREPARE prepared_non_partition_parameter_insert(int) AS
INSERT INTO prepare_table (key, value) VALUES (0, $1); INSERT INTO prepare_table (key, value) VALUES (0, $1);
-- execute 6 times to trigger prepared statement usage -- execute 6 times to trigger prepared statement usage
@ -419,7 +420,9 @@ SELECT * FROM prepare_table ORDER BY key, value;
4 | 4 |
5 | 50 5 | 50
5 | 5 |
(22 rows) 6 | 60
6 |
(24 rows)
-- check router executor select -- check router executor select
PREPARE prepared_router_partition_column_select(int) AS PREPARE prepared_router_partition_column_select(int) AS
@ -468,9 +471,13 @@ EXECUTE prepared_router_partition_column_select(5);
5 | 5 |
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test EXECUTE prepared_router_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. key | value
-- EXECUTE prepared_router_partition_column_select(6); -----+-------
6 | 60
6 |
(2 rows)
PREPARE prepared_router_non_partition_column_select(int) AS PREPARE prepared_router_non_partition_column_select(int) AS
SELECT SELECT
prepare_table.key, prepare_table.key,
@ -566,9 +573,13 @@ EXECUTE prepared_real_time_non_partition_column_select(50);
5 | 50 5 | 50
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test EXECUTE prepared_real_time_non_partition_column_select(60);
-- because of changing output. After implementing this feature, uncomment this. key | value
-- EXECUTE prepared_real_time_non_partition_column_select(60); -----+-------
0 | 60
6 | 60
(2 rows)
PREPARE prepared_real_time_partition_column_select(int) AS PREPARE prepared_real_time_partition_column_select(int) AS
SELECT SELECT
prepare_table.key, prepare_table.key,
@ -625,9 +636,15 @@ EXECUTE prepared_real_time_partition_column_select(5);
5 | 5 |
(4 rows) (4 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test EXECUTE prepared_real_time_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. key | value
-- EXECUTE prepared_real_time_partition_column_select(6); -----+-------
0 | 10
1 | 10
6 | 60
6 |
(4 rows)
-- check task-tracker executor -- check task-tracker executor
SET citus.task_executor_type TO 'task-tracker'; SET citus.task_executor_type TO 'task-tracker';
PREPARE prepared_task_tracker_non_partition_column_select(int) AS PREPARE prepared_task_tracker_non_partition_column_select(int) AS
@ -676,9 +693,13 @@ EXECUTE prepared_task_tracker_non_partition_column_select(50);
5 | 50 5 | 50
(2 rows) (2 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test EXECUTE prepared_task_tracker_non_partition_column_select(60);
-- because of changing output. After implementing this feature, uncomment this. key | value
-- EXECUTE prepared_task_tracker_non_partition_column_select(60); -----+-------
0 | 60
6 | 60
(2 rows)
PREPARE prepared_task_tracker_partition_column_select(int) AS PREPARE prepared_task_tracker_partition_column_select(int) AS
SELECT SELECT
prepare_table.key, prepare_table.key,
@ -735,9 +756,15 @@ EXECUTE prepared_task_tracker_partition_column_select(5);
5 | 5 |
(4 rows) (4 rows)
-- FIXME: 6th execution is failing. We don't want to run the failing test EXECUTE prepared_task_tracker_partition_column_select(6);
-- because of changing output. After implementing this feature, uncomment this. key | value
-- EXECUTE prepared_task_tracker_partition_column_select(6); -----+-------
0 | 10
1 | 10
6 | 60
6 |
(4 rows)
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
-- check updates -- check updates
PREPARE prepared_partition_parameter_update(int, int) AS PREPARE prepared_partition_parameter_update(int, int) AS
@ -748,8 +775,7 @@ EXECUTE prepared_partition_parameter_update(2, 21);
EXECUTE prepared_partition_parameter_update(3, 31); EXECUTE prepared_partition_parameter_update(3, 31);
EXECUTE prepared_partition_parameter_update(4, 41); EXECUTE prepared_partition_parameter_update(4, 41);
EXECUTE prepared_partition_parameter_update(5, 51); EXECUTE prepared_partition_parameter_update(5, 51);
-- This fails with an unexpected error message EXECUTE prepared_partition_parameter_update(6, 61);
EXECUTE prepared_partition_parameter_update(5, 52);
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
DETAIL: This command modifies all shards. DETAIL: This command modifies all shards.
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
@ -788,7 +814,9 @@ SELECT * FROM prepare_table ORDER BY key, value;
4 | 41 4 | 41
5 | 51 5 | 51
5 | 51 5 | 51
(22 rows) 6 | 60
6 |
(24 rows)
-- check deletes -- check deletes
PREPARE prepared_partition_parameter_delete(int, int) AS PREPARE prepared_partition_parameter_delete(int, int) AS
@ -798,8 +826,7 @@ EXECUTE prepared_partition_parameter_delete(2, 21);
EXECUTE prepared_partition_parameter_delete(3, 31); EXECUTE prepared_partition_parameter_delete(3, 31);
EXECUTE prepared_partition_parameter_delete(4, 41); EXECUTE prepared_partition_parameter_delete(4, 41);
EXECUTE prepared_partition_parameter_delete(5, 51); EXECUTE prepared_partition_parameter_delete(5, 51);
-- This fails with an unexpected error message EXECUTE prepared_partition_parameter_delete(6, 61);
EXECUTE prepared_partition_parameter_delete(0, 10);
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
DETAIL: This command modifies all shards. DETAIL: This command modifies all shards.
HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations. HINT: Consider using an equality filter on partition column "key". You can use master_modify_multiple_shards() to perform multi-shard delete or update operations.
@ -822,7 +849,9 @@ SELECT * FROM prepare_table ORDER BY key, value;
0 | 0 |
0 | 0 |
0 | 0 |
(6 rows) 6 | 60
6 |
(8 rows)
-- verify placement state updates invalidate shard state -- verify placement state updates invalidate shard state
-- --

View File

@ -1091,7 +1091,7 @@ FROM
colocated_table_test, colocated_table_test_2 colocated_table_test, colocated_table_test_2
WHERE WHERE
colocated_table_test.value_1 = colocated_table_test.value_1; colocated_table_test.value_1 = colocated_table_test.value_1;
ERROR: If data inserted into a reference table, all of the participating tables in the INSERT INTO ... SELECT query should be reference tables. ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT
-- should error out, same as the above -- should error out, same as the above
INSERT INTO INSERT INTO
reference_table_test (value_1) reference_table_test (value_1)
@ -1101,7 +1101,7 @@ FROM
colocated_table_test, reference_table_test colocated_table_test, reference_table_test
WHERE WHERE
colocated_table_test.value_1 = reference_table_test.value_1; colocated_table_test.value_1 = reference_table_test.value_1;
ERROR: If data inserted into a reference table, all of the participating tables in the INSERT INTO ... SELECT query should be reference tables. ERROR: only reference tables may be queried when targeting a reference table with INSERT ... SELECT
-- now, insert into the hash partitioned table and use reference -- now, insert into the hash partitioned table and use reference
-- tables in the SELECT queries -- tables in the SELECT queries
INSERT INTO INSERT INTO
@ -1146,7 +1146,7 @@ FROM
WHERE WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4 colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2; RETURNING value_1, value_2;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
-- partition column value comes from reference table which should error out -- partition column value comes from reference table which should error out
INSERT INTO INSERT INTO
@ -1158,7 +1158,7 @@ FROM
WHERE WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4 colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2; RETURNING value_1, value_2;
ERROR: cannot plan INSERT INTO ... SELECT because partition columns in the target table and the subquery do not match ERROR: INSERT INTO ... SELECT partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery. DETAIL: The target table's partition column should correspond to a partition column in the subquery.
-- some tests for mark_tables_colocated -- some tests for mark_tables_colocated
-- should error out -- should error out

View File

@ -444,8 +444,7 @@ WITH new_article AS (
INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING * INSERT INTO articles_hash VALUES (1, 1, 'arsenous', 9572) RETURNING *
) )
SELECT * FROM new_article; SELECT * FROM new_article;
ERROR: cannot perform distributed planning for the given modification ERROR: data-modifying statements are not supported in the WITH clauses of distributed queries
DETAIL: Data-modifying statements are not supported in the WITH clauses of distributed queries.
-- Modifying statement in nested CTE case is covered by PostgreSQL itself -- Modifying statement in nested CTE case is covered by PostgreSQL itself
WITH new_article AS ( WITH new_article AS (
WITH nested_cte AS ( WITH nested_cte AS (

View File

@ -92,8 +92,7 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE
ERROR: master_modify_multiple_shards() does not support RETURNING ERROR: master_modify_multiple_shards() does not support RETURNING
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test');
ERROR: cannot perform distributed planning for the given modification ERROR: common table expressions are not supported in distributed modifications
DETAIL: Common table expressions are not supported in distributed modifications.
-- Check that we can successfully delete from multiple shards with 1PC -- Check that we can successfully delete from multiple shards with 1PC
SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_commit_protocol TO '1pc';
SELECT count(*) FROM multi_shard_modify_test; SELECT count(*) FROM multi_shard_modify_test;
@ -234,8 +233,7 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=
ERROR: master_modify_multiple_shards() does not support RETURNING ERROR: master_modify_multiple_shards() does not support RETURNING
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' ');
ERROR: cannot perform distributed planning for the given modification ERROR: common table expressions are not supported in distributed modifications
DETAIL: Common table expressions are not supported in distributed modifications.
-- updates referencing just a var are supported -- updates referencing just a var are supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=t_key WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value=t_key WHERE t_key = 10');
master_modify_multiple_shards master_modify_multiple_shards

View File

@ -315,7 +315,9 @@ SELECT * FROM prepare_table ORDER BY key, value;
0 | 0 |
0 | 0 |
0 | 0 |
(6 rows) 6 | 60
6 |
(8 rows)
DROP TABLE temp_table; DROP TABLE temp_table;
-- clean-up functions -- clean-up functions

View File

@ -237,8 +237,7 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL
-- subquery in the SET clause -- subquery in the SET clause
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = (SELECT count(*) from upsert_test); UPDATE SET other_col = (SELECT count(*) from upsert_test);
ERROR: cannot perform distributed planning for the given modification ERROR: subqueries are not supported in distributed modifications
DETAIL: Subqueries are not supported in distributed modifications.
-- non mutable function call in the SET -- non mutable function call in the SET
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = random()::int; UPDATE SET other_col = random()::int;

View File

@ -227,3 +227,8 @@ EXPLAIN EXECUTE router_executor_query;
PREPARE real_time_executor_query AS PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);

View File

@ -183,9 +183,8 @@ SELECT plpgsql_test_1();
SELECT plpgsql_test_2(); SELECT plpgsql_test_2();
-- run PL/pgsql functions with different parameters -- run PL/pgsql functions with different parameters
-- FIXME: temporarily disabled, waiting for proper parametrized query support SELECT plpgsql_test_6(155);
-- SELECT plpgsql_test_6(155); SELECT plpgsql_test_6(1555);
-- SELECT plpgsql_test_6(1555);
-- test router executor parameterized PL/pgsql functions -- test router executor parameterized PL/pgsql functions
CREATE TABLE plpgsql_table ( CREATE TABLE plpgsql_table (
@ -276,15 +275,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT router_partition_column_select(1); SELECT router_partition_column_select(1);
SELECT router_partition_column_select(2); SELECT router_partition_column_select(2);
SELECT router_partition_column_select(3); SELECT router_partition_column_select(3);
SELECT router_partition_column_select(4); SELECT router_partition_column_select(4);
SELECT router_partition_column_select(5); SELECT router_partition_column_select(5);
SELECT router_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- SELECT router_partition_column_select(6);
CREATE FUNCTION router_non_partition_column_select(value_arg int) CREATE FUNCTION router_non_partition_column_select(value_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
@ -305,6 +302,7 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT router_non_partition_column_select(10); SELECT router_non_partition_column_select(10);
SELECT router_non_partition_column_select(20); SELECT router_non_partition_column_select(20);
SELECT router_non_partition_column_select(30); SELECT router_non_partition_column_select(30);
@ -331,15 +329,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT real_time_non_partition_column_select(10); SELECT real_time_non_partition_column_select(10);
SELECT real_time_non_partition_column_select(20); SELECT real_time_non_partition_column_select(20);
SELECT real_time_non_partition_column_select(30); SELECT real_time_non_partition_column_select(30);
SELECT real_time_non_partition_column_select(40); SELECT real_time_non_partition_column_select(40);
SELECT real_time_non_partition_column_select(50); SELECT real_time_non_partition_column_select(50);
SELECT real_time_non_partition_column_select(60);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- SELECT real_time_non_partition_column_select(60);
CREATE FUNCTION real_time_partition_column_select(key_arg int) CREATE FUNCTION real_time_partition_column_select(key_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
@ -360,15 +356,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT real_time_partition_column_select(1); SELECT real_time_partition_column_select(1);
SELECT real_time_partition_column_select(2); SELECT real_time_partition_column_select(2);
SELECT real_time_partition_column_select(3); SELECT real_time_partition_column_select(3);
SELECT real_time_partition_column_select(4); SELECT real_time_partition_column_select(4);
SELECT real_time_partition_column_select(5); SELECT real_time_partition_column_select(5);
SELECT real_time_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- SELECT real_time_partition_column_select(6);
-- check task-tracker executor -- check task-tracker executor
SET citus.task_executor_type TO 'task-tracker'; SET citus.task_executor_type TO 'task-tracker';
@ -391,15 +385,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT task_tracker_non_partition_column_select(10); SELECT task_tracker_non_partition_column_select(10);
SELECT task_tracker_non_partition_column_select(20); SELECT task_tracker_non_partition_column_select(20);
SELECT task_tracker_non_partition_column_select(30); SELECT task_tracker_non_partition_column_select(30);
SELECT task_tracker_non_partition_column_select(40); SELECT task_tracker_non_partition_column_select(40);
SELECT task_tracker_non_partition_column_select(50); SELECT task_tracker_non_partition_column_select(50);
SELECT real_time_non_partition_column_select(60);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- SELECT real_time_non_partition_column_select(60);
CREATE FUNCTION task_tracker_partition_column_select(key_arg int) CREATE FUNCTION task_tracker_partition_column_select(key_arg int)
RETURNS TABLE(key int, value int) AS $$ RETURNS TABLE(key int, value int) AS $$
@ -420,15 +412,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT task_tracker_partition_column_select(1); SELECT task_tracker_partition_column_select(1);
SELECT task_tracker_partition_column_select(2); SELECT task_tracker_partition_column_select(2);
SELECT task_tracker_partition_column_select(3); SELECT task_tracker_partition_column_select(3);
SELECT task_tracker_partition_column_select(4); SELECT task_tracker_partition_column_select(4);
SELECT task_tracker_partition_column_select(5); SELECT task_tracker_partition_column_select(5);
SELECT task_tracker_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- SELECT task_tracker_partition_column_select(6);
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
@ -445,8 +435,7 @@ SELECT partition_parameter_update(2, 21);
SELECT partition_parameter_update(3, 31); SELECT partition_parameter_update(3, 31);
SELECT partition_parameter_update(4, 41); SELECT partition_parameter_update(4, 41);
SELECT partition_parameter_update(5, 51); SELECT partition_parameter_update(5, 51);
-- This fails with an unexpected error message SELECT partition_parameter_update(6, 61);
SELECT partition_parameter_update(5, 52);
CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$ CREATE FUNCTION non_partition_parameter_update(int, int) RETURNS void as $$
BEGIN BEGIN
@ -472,13 +461,13 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- execute 6 times to trigger prepared statement usage
SELECT partition_parameter_delete(1, 11); SELECT partition_parameter_delete(1, 11);
SELECT partition_parameter_delete(2, 21); SELECT partition_parameter_delete(2, 21);
SELECT partition_parameter_delete(3, 31); SELECT partition_parameter_delete(3, 31);
SELECT partition_parameter_delete(4, 41); SELECT partition_parameter_delete(4, 41);
SELECT partition_parameter_delete(5, 51); SELECT partition_parameter_delete(5, 51);
-- This fails with an unexpected error message SELECT partition_parameter_delete(6, 61);
SELECT partition_parameter_delete(0, 10);
CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$ CREATE FUNCTION non_partition_parameter_delete(int) RETURNS void as $$
BEGIN BEGIN

View File

@ -149,8 +149,7 @@ EXECUTE prepared_test_2;
-- execute prepared statements with different parameters -- execute prepared statements with different parameters
EXECUTE prepared_test_6(155); EXECUTE prepared_test_6(155);
-- FIXME: temporarily disabled EXECUTE prepared_test_6(1555);
-- EXECUTE prepared_test_6(1555);
-- test router executor with parameterized non-partition columns -- test router executor with parameterized non-partition columns
@ -194,12 +193,12 @@ EXECUTE prepared_select(4, 40);
EXECUTE prepared_select(5, 50); EXECUTE prepared_select(5, 50);
EXECUTE prepared_select(6, 60); EXECUTE prepared_select(6, 60);
-- test that we don't crash on failing parameterized insert on the partition column -- Test that parameterized partition column for an insert is supported
PREPARE prepared_partition_column_insert(bigint) AS PREPARE prepared_partition_column_insert(bigint) AS
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
-- we error out on the 6th execution -- execute 6 times to trigger prepared statement usage
EXECUTE prepared_partition_column_insert(1); EXECUTE prepared_partition_column_insert(1);
EXECUTE prepared_partition_column_insert(2); EXECUTE prepared_partition_column_insert(2);
EXECUTE prepared_partition_column_insert(3); EXECUTE prepared_partition_column_insert(3);
@ -282,10 +281,7 @@ EXECUTE prepared_router_partition_column_select(2);
EXECUTE prepared_router_partition_column_select(3); EXECUTE prepared_router_partition_column_select(3);
EXECUTE prepared_router_partition_column_select(4); EXECUTE prepared_router_partition_column_select(4);
EXECUTE prepared_router_partition_column_select(5); EXECUTE prepared_router_partition_column_select(5);
EXECUTE prepared_router_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- EXECUTE prepared_router_partition_column_select(6);
PREPARE prepared_router_non_partition_column_select(int) AS PREPARE prepared_router_non_partition_column_select(int) AS
SELECT SELECT
@ -325,10 +321,7 @@ EXECUTE prepared_real_time_non_partition_column_select(20);
EXECUTE prepared_real_time_non_partition_column_select(30); EXECUTE prepared_real_time_non_partition_column_select(30);
EXECUTE prepared_real_time_non_partition_column_select(40); EXECUTE prepared_real_time_non_partition_column_select(40);
EXECUTE prepared_real_time_non_partition_column_select(50); EXECUTE prepared_real_time_non_partition_column_select(50);
EXECUTE prepared_real_time_non_partition_column_select(60);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- EXECUTE prepared_real_time_non_partition_column_select(60);
PREPARE prepared_real_time_partition_column_select(int) AS PREPARE prepared_real_time_partition_column_select(int) AS
SELECT SELECT
@ -348,10 +341,7 @@ EXECUTE prepared_real_time_partition_column_select(2);
EXECUTE prepared_real_time_partition_column_select(3); EXECUTE prepared_real_time_partition_column_select(3);
EXECUTE prepared_real_time_partition_column_select(4); EXECUTE prepared_real_time_partition_column_select(4);
EXECUTE prepared_real_time_partition_column_select(5); EXECUTE prepared_real_time_partition_column_select(5);
EXECUTE prepared_real_time_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- EXECUTE prepared_real_time_partition_column_select(6);
-- check task-tracker executor -- check task-tracker executor
SET citus.task_executor_type TO 'task-tracker'; SET citus.task_executor_type TO 'task-tracker';
@ -373,10 +363,7 @@ EXECUTE prepared_task_tracker_non_partition_column_select(20);
EXECUTE prepared_task_tracker_non_partition_column_select(30); EXECUTE prepared_task_tracker_non_partition_column_select(30);
EXECUTE prepared_task_tracker_non_partition_column_select(40); EXECUTE prepared_task_tracker_non_partition_column_select(40);
EXECUTE prepared_task_tracker_non_partition_column_select(50); EXECUTE prepared_task_tracker_non_partition_column_select(50);
EXECUTE prepared_task_tracker_non_partition_column_select(60);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- EXECUTE prepared_task_tracker_non_partition_column_select(60);
PREPARE prepared_task_tracker_partition_column_select(int) AS PREPARE prepared_task_tracker_partition_column_select(int) AS
SELECT SELECT
@ -396,10 +383,7 @@ EXECUTE prepared_task_tracker_partition_column_select(2);
EXECUTE prepared_task_tracker_partition_column_select(3); EXECUTE prepared_task_tracker_partition_column_select(3);
EXECUTE prepared_task_tracker_partition_column_select(4); EXECUTE prepared_task_tracker_partition_column_select(4);
EXECUTE prepared_task_tracker_partition_column_select(5); EXECUTE prepared_task_tracker_partition_column_select(5);
EXECUTE prepared_task_tracker_partition_column_select(6);
-- FIXME: 6th execution is failing. We don't want to run the failing test
-- because of changing output. After implementing this feature, uncomment this.
-- EXECUTE prepared_task_tracker_partition_column_select(6);
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
@ -413,8 +397,7 @@ EXECUTE prepared_partition_parameter_update(2, 21);
EXECUTE prepared_partition_parameter_update(3, 31); EXECUTE prepared_partition_parameter_update(3, 31);
EXECUTE prepared_partition_parameter_update(4, 41); EXECUTE prepared_partition_parameter_update(4, 41);
EXECUTE prepared_partition_parameter_update(5, 51); EXECUTE prepared_partition_parameter_update(5, 51);
-- This fails with an unexpected error message EXECUTE prepared_partition_parameter_update(6, 61);
EXECUTE prepared_partition_parameter_update(5, 52);
PREPARE prepared_non_partition_parameter_update(int, int) AS PREPARE prepared_non_partition_parameter_update(int, int) AS
UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1; UPDATE prepare_table SET value = $2 WHERE key = 0 AND value = $1;
@ -439,8 +422,7 @@ EXECUTE prepared_partition_parameter_delete(2, 21);
EXECUTE prepared_partition_parameter_delete(3, 31); EXECUTE prepared_partition_parameter_delete(3, 31);
EXECUTE prepared_partition_parameter_delete(4, 41); EXECUTE prepared_partition_parameter_delete(4, 41);
EXECUTE prepared_partition_parameter_delete(5, 51); EXECUTE prepared_partition_parameter_delete(5, 51);
-- This fails with an unexpected error message EXECUTE prepared_partition_parameter_delete(6, 61);
EXECUTE prepared_partition_parameter_delete(0, 10);
PREPARE prepared_non_partition_parameter_delete(int) AS PREPARE prepared_non_partition_parameter_delete(int) AS
DELETE FROM prepare_table WHERE key = 0 AND value = $1; DELETE FROM prepare_table WHERE key = 0 AND value = $1;