diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index e831e4583..19ed656c2 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -206,18 +206,8 @@ multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR) { - Task *task = NULL; - PlannedStmt *planStatement = queryDesc->plannedstmt; - MultiPlan *multiPlan = GetMultiPlan(planStatement); - List *taskList = multiPlan->workerJob->taskList; - - /* router executor can only execute distributed plans with a single task */ - Assert(list_length(taskList) == 1); - - task = (Task *) linitial(taskList); - /* drop into the router executor */ - RouterExecutorRun(queryDesc, direction, count, task); + RouterExecutorRun(queryDesc, direction, count); } else { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9e643b00c..961345a12 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -19,21 +19,26 @@ #include "miscadmin.h" #include "access/xact.h" +#include "distributed/citus_clauses.h" +#include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" -#include "executor/executor.h" #include "nodes/pg_list.h" +#include "optimizer/clauses.h" #include "utils/builtins.h" - #include "utils/elog.h" #include "utils/errcodes.h" #include "utils/memutils.h" #include "utils/palloc.h" #include "utils/int8.h" +#if (PG_VERSION_NUM >= 90500) +#include "utils/ruleutils.h" +#endif /* controls use of locks to enforce safe commutativity */ @@ -49,6 +54,7 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); +static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, TupleDesc tupleDescriptor, int64 *rows); @@ -179,8 +185,12 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) * RouterExecutorRun actually executes a single task on a worker. */ void -RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task) +RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { + PlannedStmt *planStatement = queryDesc->plannedstmt; + MultiPlan *multiPlan = GetMultiPlan(planStatement); + List *taskList = multiPlan->workerJob->taskList; + Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; @@ -188,6 +198,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas MaterialState *routerState = (MaterialState *) queryDesc->planstate; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; + /* router executor can only execute distributed plans with a single task */ + Assert(list_length(taskList) == 1); + task = (Task *) linitial(taskList); + Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); Assert(task != NULL); @@ -309,6 +323,22 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, ListCell *failedPlacementCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; + char *queryString = task->queryString; + + if (isModificationQuery && task->requiresMasterEvaluation) + { + PlannedStmt *planStatement = queryDesc->plannedstmt; + MultiPlan *multiPlan = GetMultiPlan(planStatement); + Query *query = multiPlan->workerJob->jobQuery; + StringInfo queryStringInfo = makeStringInfo(); + + ExecuteMasterEvaluableFunctions(query); + DeparseShardQuery(query, task, queryStringInfo); + queryString = queryStringInfo->data; + + elog(DEBUG4, "query before master evaluation: %s", task->queryString); + elog(DEBUG4, "query after master evaluation: %s", queryString); + } /* * Try to run the query to completion on one placement. If the query fails @@ -329,7 +359,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, continue; } - queryOK = SendQueryInSingleRowMode(connection, task->queryString); + queryOK = SendQueryInSingleRowMode(connection, queryString); if (!queryOK) { PurgeConnection(connection); @@ -425,6 +455,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } +static void +DeparseShardQuery(Query *query, Task *task, StringInfo queryString) +{ + uint64 shardId = task->anchorShardId; + Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; + + deparse_shard_query(query, relid, shardId, queryString); +} + + /* * ReturnRowsFromTuplestore moves rows from a given tuplestore into a * receiver. It performs the necessary limiting to support cursors. diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 48d82e66e..6c7cf7833 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -24,6 +24,7 @@ #include "catalog/pg_class.h" #include "commands/dbcommands.h" #include "commands/event_trigger.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" @@ -124,6 +125,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) errmsg("master_modify_multiple_shards() does not support RETURNING"))); } + ExecuteMasterEvaluableFunctions(modifyQuery); + shardIntervalList = LoadShardIntervalList(relationId); restrictClauseList = WhereClauseList(modifyQuery->jointree); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 5f76b3194..14513afec 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -21,6 +21,7 @@ #include "access/skey.h" #endif #include "access/xact.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -51,11 +52,24 @@ #include "utils/errcodes.h" #include "utils/lsyscache.h" #include "utils/rel.h" -#include "utils/relcache.h" -#include "utils/typcache.h" + +#include "catalog/pg_proc.h" +#include "optimizer/planmain.h" + + +typedef struct WalkerState +{ + bool containsVar; + bool varArgument; + bool badCoalesce; +} WalkerState; /* planner functions forward declarations */ +static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, + bool *badCoalesce); +static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); +static char MostPermissiveVolatileFlag(char left, char right); static Task * RouterModifyTask(Query *query); #if (PG_VERSION_NUM >= 90500) static OnConflictExpr * RebuildOnConflict(Oid relationId, @@ -143,8 +157,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) ListCell *rangeTableCell = NULL; bool hasValuesScan = false; uint32 queryTableCount = 0; - bool hasNonConstTargetEntryExprs = false; - bool hasNonConstQualExprs = false; bool specifiesPartitionValue = false; #if (PG_VERSION_NUM >= 90500) ListCell *setTargetCell = NULL; @@ -248,6 +260,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) /* reject queries which involve multi-row inserts */ if (hasValuesScan) { + /* + * NB: If you remove this check you must also change the checks further in this + * method and ensure that VOLATILE function calls aren't allowed in INSERT + * statements. Currently they're allowed but the function call is replaced + * with a constant, and if you're inserting multiple rows at once the function + * should return a different value for each row. + */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given" " modification"), @@ -258,6 +277,8 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ FromExpr *joinTree = NULL; ListCell *targetEntryCell = NULL; @@ -271,9 +292,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) continue; } - if (contain_mutable_functions((Node *) targetEntry->expr)) + if (commandType == CMD_UPDATE && + contain_volatile_functions((Node *) targetEntry->expr)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in UPDATE queries on distributed " + "tables must not be VOLATILE"))); } if (commandType == CMD_UPDATE && @@ -281,17 +305,59 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { specifiesPartitionValue = true; } + + if (targetEntry->resno == partitionColumn->varattno && + !IsA(targetEntry->expr, Const)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("values given for the partition column must be" + " constants or constant expressions"))); + } + + if (commandType == CMD_UPDATE && + MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } } joinTree = queryTree->jointree; - if (joinTree != NULL && contain_mutable_functions(joinTree->quals)) + if (joinTree != NULL) { - hasNonConstQualExprs = true; + if (contain_volatile_functions(joinTree->quals)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the WHERE clause of " + "modification queries on distributed tables " + "must not be VOLATILE"))); + } + else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, + &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("STABLE functions used in UPDATE queries" + " cannot be called with column references"))); + } + + if (hasBadCoalesce) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("non-IMMUTABLE functions are not allowed in CASE or" + " COALESCE statements"))); } if (contain_mutable_functions((Node *) queryTree->returningList)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("non-IMMUTABLE functions are not allowed in the" + " RETURNING clause"))); } } @@ -342,7 +408,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } else if (contain_mutable_functions((Node *) setTargetEntry->expr)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the DO UPDATE SET clause of " + "INSERTs on distributed tables must be marked " + "IMMUTABLE"))); } } } @@ -351,17 +420,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (contain_mutable_functions((Node *) arbiterWhere) || contain_mutable_functions((Node *) onConflictWhere)) { - hasNonConstQualExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the WHERE clause of the ON CONFLICT " + "clause of INSERTs on distributed tables must be marked " + "IMMUTABLE"))); } #endif - if (hasNonConstTargetEntryExprs || hasNonConstQualExprs) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in modification queries on distributed " - "tables must be marked IMMUTABLE"))); - } - if (specifiesPartitionValue) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -370,6 +435,246 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } +/* + * If the expression contains STABLE functions which accept any parameters derived from a + * Var returns true and sets varArgument. + * + * If the expression contains a CASE or COALESCE which invoke non-IMMUTABLE functions + * returns true and sets badCoalesce. + * + * Assumes the expression contains no VOLATILE functions. + * + * Var's are allowed, but only if they are passed solely to IMMUTABLE functions + * + * We special-case CASE/COALESCE because those are evaluated lazily. We could evaluate + * CASE/COALESCE expressions which don't reference Vars, or partially evaluate some + * which do, but for now we just error out. That makes both the code and user-education + * easier. + */ +static bool +MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce) +{ + bool result; + WalkerState data; + data.containsVar = data.varArgument = data.badCoalesce = false; + + result = MasterIrreducibleExpressionWalker(expression, &data); + + *varArgument |= data.varArgument; + *badCoalesce |= data.badCoalesce; + return result; +} + + +static bool +MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) +{ + char volatileFlag = 0; + WalkerState childState = { false, false, false }; + bool containsDisallowedFunction = false; + + if (expression == NULL) + { + return false; + } + + if (IsA(expression, CoalesceExpr)) + { + CoalesceExpr *expr = (CoalesceExpr *) expression; + + if (contain_mutable_functions((Node *) (expr->args))) + { + state->badCoalesce = true; + return true; + } + else + { + /* + * There's no need to recurse. Since there are no STABLE functions + * varArgument will never be set. + */ + return false; + } + } + + if (IsA(expression, CaseExpr)) + { + if (contain_mutable_functions(expression)) + { + state->badCoalesce = true; + return true; + } + + return false; + } + + if (IsA(expression, Var)) + { + state->containsVar = true; + return false; + } + + /* + * In order for statement replication to give us consistent results it's important + * that we either disallow or evaluate on the master anything which has a volatility + * category above IMMUTABLE. Newer versions of postgres might add node types which + * should be checked in this function. + * + * Look through contain_mutable_functions_walker or future PG's equivalent for new + * node types before bumping this version number to fix compilation. + * + * Once you've added them to this check, make sure you also evaluate them in the + * executor! + */ + StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section" + " needs to be reviewed."); + + if (IsA(expression, OpExpr)) + { + OpExpr *expr = (OpExpr *) expression; + + set_opfuncid(expr); + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, FuncExpr)) + { + FuncExpr *expr = (FuncExpr *) expression; + + volatileFlag = func_volatile(expr->funcid); + } + else if (IsA(expression, DistinctExpr)) + { + /* + * to exercise this, you need to create a custom type for which the '=' operator + * is STABLE/VOLATILE + */ + DistinctExpr *expr = (DistinctExpr *) expression; + + set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, NullIfExpr)) + { + /* + * same as above, exercising this requires a STABLE/VOLATILE '=' operator + */ + NullIfExpr *expr = (NullIfExpr *) expression; + + set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, ScalarArrayOpExpr)) + { + /* + * to exercise this you need to CREATE OPERATOR with a binary predicate + * and use it within an ANY/ALL clause. + */ + ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression; + + set_sa_opfuncid(expr); + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, CoerceViaIO)) + { + /* + * to exercise this you need to use a type with a STABLE/VOLATILE intype or + * outtype. + */ + CoerceViaIO *expr = (CoerceViaIO *) expression; + Oid iofunc; + Oid typioparam; + bool typisvarlena; + + /* check the result type's input function */ + getTypeInputInfo(expr->resulttype, + &iofunc, &typioparam); + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc)); + + /* check the input type's output function */ + getTypeOutputInfo(exprType((Node *) expr->arg), + &iofunc, &typisvarlena); + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc)); + } + else if (IsA(expression, ArrayCoerceExpr)) + { + ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression; + + if (OidIsValid(expr->elemfuncid)) + { + volatileFlag = func_volatile(expr->elemfuncid); + } + } + else if (IsA(expression, RowCompareExpr)) + { + RowCompareExpr *rcexpr = (RowCompareExpr *) expression; + ListCell *opid; + + foreach(opid, rcexpr->opnos) + { + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, + op_volatile(lfirst_oid(opid))); + } + } + else if (IsA(expression, Query)) + { + /* subqueries aren't allowed and fail before control reaches this point */ + Assert(false); + } + + if (volatileFlag == PROVOLATILE_VOLATILE) + { + /* the caller should have already checked for this */ + Assert(false); + } + else if (volatileFlag == PROVOLATILE_STABLE) + { + containsDisallowedFunction = + expression_tree_walker(expression, + MasterIrreducibleExpressionWalker, + &childState); + + if (childState.containsVar) + { + state->varArgument = true; + } + + state->badCoalesce |= childState.badCoalesce; + state->varArgument |= childState.varArgument; + + return (containsDisallowedFunction || childState.containsVar); + } + + /* keep traversing */ + return expression_tree_walker(expression, + MasterIrreducibleExpressionWalker, + state); +} + + +/* + * Return the most-pessimistic volatility flag of the two params. + * + * for example: given two flags, if one is stable and one is volatile, an expression + * involving both is volatile. + */ +char +MostPermissiveVolatileFlag(char left, char right) +{ + if (left == PROVOLATILE_VOLATILE || right == PROVOLATILE_VOLATILE) + { + return PROVOLATILE_VOLATILE; + } + else if (left == PROVOLATILE_STABLE || right == PROVOLATILE_STABLE) + { + return PROVOLATILE_STABLE; + } + else + { + return PROVOLATILE_IMMUTABLE; + } +} + + /* * RouterModifyTask builds a Task to represent a modification performed by * the provided query against the provided shard interval. This task contains @@ -384,6 +689,7 @@ RouterModifyTask(Query *query) StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; + bool requiresMasterEvaluation = RequiresMasterEvaluation(query); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -446,6 +752,7 @@ RouterModifyTask(Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; + modifyTask->requiresMasterEvaluation = requiresMasterEvaluation; return modifyTask; } @@ -616,10 +923,10 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod) /* - * FastShardPruning is a higher level API for FindShardInterval function. Given the relationId - * of the distributed table and partitionValue, FastShardPruning function finds the corresponding - * shard interval that the partitionValue should be in. FastShardPruning returns NULL if no - * ShardIntervals exist for the given partitionValue. + * FastShardPruning is a higher level API for FindShardInterval function. Given the + * relationId of the distributed table and partitionValue, FastShardPruning function finds + * the corresponding shard interval that the partitionValue should be in. FastShardPruning + * returns NULL if no ShardIntervals exist for the given partitionValue. */ static ShardInterval * FastShardPruning(Oid distributedTableId, Const *partitionValue) @@ -806,6 +1113,7 @@ RouterSelectTask(Query *query) task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; + task->requiresMasterEvaluation = false; return task; } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c new file mode 100644 index 000000000..521ad3aa7 --- /dev/null +++ b/src/backend/distributed/utils/citus_clauses.c @@ -0,0 +1,324 @@ +/* + * citus_clauses.c + * + * Routines roughly equivalent to postgres' util/clauses. + * + * Copyright (c) 2016-2016, Citus Data, Inc. + */ + +#include "postgres.h" + +#include "distributed/citus_clauses.h" + +#include "catalog/pg_type.h" +#include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/primnodes.h" +#include "optimizer/clauses.h" +#include "optimizer/planmain.h" +#include "utils/datum.h" +#include "utils/lsyscache.h" + +static Node * PartiallyEvaluateExpression(Node *expression); +static Node * EvaluateNodeIfReferencesFunction(Node *expression); +static Node * PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar); +static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, + Oid result_collation); + + +/* + * Whether the executor needs to reparse and try to execute this query. + */ +bool +RequiresMasterEvaluation(Query *query) +{ + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (contain_mutable_functions((Node *) targetEntry->expr)) + { + return true; + } + } + + if (query->jointree && query->jointree->quals) + { + return contain_mutable_functions((Node *) query->jointree->quals); + } + + return false; +} + + +/* + * Looks at each TargetEntry of the query and the jointree quals, evaluating + * any sub-expressions which don't include Vars. + */ +void +ExecuteMasterEvaluableFunctions(Query *query) +{ + CmdType commandType = query->commandType; + ListCell *targetEntryCell = NULL; + Node *modifiedNode = NULL; + + if (query->jointree && query->jointree->quals) + { + query->jointree->quals = PartiallyEvaluateExpression(query->jointree->quals); + } + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* performance optimization for the most common cases */ + if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) + { + continue; + } + + if (commandType == CMD_INSERT) + { + modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr); + } + else + { + modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr); + } + + targetEntry->expr = (Expr *) modifiedNode; + } + + if (query->jointree) + { + Assert(!contain_mutable_functions((Node *) (query->jointree->quals))); + } + Assert(!contain_mutable_functions((Node *) (query->targetList))); +} + + +/* + * Walks the expression evaluating any node which invokes a function as long as a Var + * doesn't show up in the parameter list. + */ +static Node * +PartiallyEvaluateExpression(Node *expression) +{ + bool unused; + return PartiallyEvaluateExpressionMutator(expression, &unused); +} + + +/* + * When you find a function call evaluate it, the planner made sure there were no Vars. + * + * Tell your parent if either you or one if your children is a Var. + * + * A little inefficient. It goes to the bottom of the tree then calls EvaluateExpression + * on each function on the way back up. Say we had an expression with no Vars, we could + * only call EvaluateExpression on the top-most level and get the same result. + */ +static Node * +PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar) +{ + bool childContainsVar = false; + Node *copy = NULL; + + if (expression == NULL) + { + return expression; + } + + /* pass any argument lists back to the mutator to copy and recurse for us */ + if (IsA(expression, List)) + { + return expression_tree_mutator(expression, + PartiallyEvaluateExpressionMutator, + containsVar); + } + + if (IsA(expression, Var)) + { + *containsVar = true; + + /* makes a copy for us */ + return expression_tree_mutator(expression, + PartiallyEvaluateExpressionMutator, + containsVar); + } + + copy = expression_tree_mutator(expression, + PartiallyEvaluateExpressionMutator, + &childContainsVar); + + if (childContainsVar) + { + *containsVar = true; + } + else + { + copy = EvaluateNodeIfReferencesFunction(copy); + } + + return copy; +} + + +/* + * Used to evaluate functions during queries on the master before sending them to workers + * + * The idea isn't to evaluate every kind of expression, just the kinds whoes result might + * change between invocations (the idea is to allow users to use functions but still have + * consistent shard replicas, since we use statement replication). This means evaluating + * all nodes which invoke functions which might not be IMMUTABLE. + */ +static Node * +EvaluateNodeIfReferencesFunction(Node *expression) +{ + if (IsA(expression, FuncExpr)) + { + FuncExpr *expr = (FuncExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->funcresulttype, + exprTypmod((Node *) expr), + expr->funccollid); + } + + if (IsA(expression, OpExpr) || + IsA(expression, DistinctExpr) || + IsA(expression, NullIfExpr)) + { + /* structural equivalence */ + OpExpr *expr = (OpExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->opresulttype, -1, + expr->opcollid); + } + + if (IsA(expression, CoerceViaIO)) + { + CoerceViaIO *expr = (CoerceViaIO *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->resulttype, -1, + expr->resultcollid); + } + + if (IsA(expression, ArrayCoerceExpr)) + { + ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->resulttype, + expr->resulttypmod, + expr->resultcollid); + } + + if (IsA(expression, ScalarArrayOpExpr)) + { + ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); + } + + if (IsA(expression, RowCompareExpr)) + { + RowCompareExpr *expr = (RowCompareExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); + } + + return expression; +} + + +/* + * a copy of pg's evaluate_expr, pre-evaluate a constant expression + * + * We use the executor's routine ExecEvalExpr() to avoid duplication of + * code and ensure we get the same result as the executor would get. + * + * *INDENT-OFF* + */ +static Expr * +citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, + Oid result_collation) +{ + EState *estate; + ExprState *exprstate; + MemoryContext oldcontext; + Datum const_val; + bool const_is_null; + int16 resultTypLen; + bool resultTypByVal; + + /* + * To use the executor, we need an EState. + */ + estate = CreateExecutorState(); + + /* We can use the estate's working context to avoid memory leaks. */ + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* Make sure any opfuncids are filled in. */ + fix_opfuncids((Node *) expr); + + /* + * Prepare expr for execution. (Note: we can't use ExecPrepareExpr + * because it'd result in recursively invoking eval_const_expressions.) + */ + exprstate = ExecInitExpr(expr, NULL); + + /* + * And evaluate it. + * + * It is OK to use a default econtext because none of the ExecEvalExpr() + * code used in this situation will use econtext. That might seem + * fortuitous, but it's not so unreasonable --- a constant expression does + * not depend on context, by definition, n'est ce pas? + */ + const_val = ExecEvalExprSwitchContext(exprstate, + GetPerTupleExprContext(estate), + &const_is_null, NULL); + + /* Get info needed about result datatype */ + get_typlenbyval(result_type, &resultTypLen, &resultTypByVal); + + /* Get back to outer memory context */ + MemoryContextSwitchTo(oldcontext); + + /* + * Must copy result out of sub-context used by expression eval. + * + * Also, if it's varlena, forcibly detoast it. This protects us against + * storing TOAST pointers into plans that might outlive the referenced + * data. (makeConst would handle detoasting anyway, but it's worth a few + * extra lines here so that we can do the copy and detoast in one step.) + */ + if (!const_is_null) + { + if (resultTypLen == -1) + const_val = PointerGetDatum(PG_DETOAST_DATUM_COPY(const_val)); + else + const_val = datumCopy(const_val, resultTypByVal, resultTypLen); + } + + /* Release all the junk we just created */ + FreeExecutorState(estate); + + /* + * Make the constant result node. + */ + return (Expr *) makeConst(result_type, result_typmod, result_collation, + resultTypLen, + const_val, const_is_null, + resultTypByVal); +} + +/* *INDENT-ON* */ diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 2510cc28c..124185ff4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -458,6 +458,7 @@ _outTask(StringInfo str, const Task *node) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); + WRITE_BOOL_FIELD(requiresMasterEvaluation); } diff --git a/src/backend/distributed/utils/citus_readfuncs_94.c b/src/backend/distributed/utils/citus_readfuncs_94.c index 4be2d093f..d7cb9d61f 100644 --- a/src/backend/distributed/utils/citus_readfuncs_94.c +++ b/src/backend/distributed/utils/citus_readfuncs_94.c @@ -1418,6 +1418,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index 89fc16753..caf223c00 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1507,6 +1507,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h new file mode 100644 index 000000000..29905850a --- /dev/null +++ b/src/include/distributed/citus_clauses.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * citus_clauses.h + * Routines roughly equivalent to postgres' util/clauses. + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_CLAUSES_H +#define CITUS_CLAUSES_H + +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" + +extern bool RequiresMasterEvaluation(Query *query); +extern void ExecuteMasterEvaluableFunctions(Query *query); + +#endif /* CITUS_CLAUSES_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ff769d18e..032b8f49f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -141,6 +141,9 @@ typedef struct MapMergeJob * as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data * fetch tasks. We also forward declare the task execution struct here to avoid * including the executor header files. + * + * NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask + * in citus_readfuncs to correctly (de)serialize this struct. */ typedef struct TaskExecution TaskExecution; @@ -156,12 +159,13 @@ typedef struct Task List *dependedTaskList; /* only applies to compute tasks */ uint32 partitionId; - uint32 upstreamTaskId; /* only applies to data fetch tasks */ - ShardInterval *shardInterval; /* only applies to merge tasks */ - bool assignmentConstrained; /* only applies to merge tasks */ - uint64 shardId; /* only applies to shard fetch tasks */ - TaskExecution *taskExecution; /* used by task tracker executor */ - bool upsertQuery; /* only applies to modify tasks */ + uint32 upstreamTaskId; /* only applies to data fetch tasks */ + ShardInterval *shardInterval; /* only applies to merge tasks */ + bool assignmentConstrained; /* only applies to merge tasks */ + uint64 shardId; /* only applies to shard fetch tasks */ + TaskExecution *taskExecution; /* used by task tracker executor */ + bool upsertQuery; /* only applies to modify tasks */ + bool requiresMasterEvaluation; /* only applies to modify tasks */ } Task; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index f18af7686..e01540237 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -16,8 +16,7 @@ extern bool AllModificationsCommutative; extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); -extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, - Task *task); +extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); diff --git a/src/test/regress/expected/multi_function_evaluation.out b/src/test/regress/expected/multi_function_evaluation.out new file mode 100644 index 000000000..d451e4ad8 --- /dev/null +++ b/src/test/regress/expected/multi_function_evaluation.out @@ -0,0 +1,117 @@ +-- +-- MULTI_FUNCTION_EVALUATION +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; +-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL) +CREATE TABLE example (key INT, value INT); +SELECT master_create_distributed_table('example', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +CREATE SEQUENCE example_value_seq; +SELECT master_create_worker_shards('example', 1, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO example VALUES (1, nextval('example_value_seq')); +SELECT * FROM example; + key | value +-----+------- + 1 | 1 +(1 row) + +-- functions called by prepared statements are also evaluated +PREPARE stmt AS INSERT INTO example VALUES (2); +EXECUTE stmt; +EXECUTE stmt; +SELECT * FROM example; + key | value +-----+------- + 1 | 1 + 2 | + 2 | +(3 rows) + +-- non-immutable functions inside CASE/COALESCE aren't allowed +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamp; +-- this is allowed because there are no mutable funcs in the CASE +UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1; +-- this is allowed because the planner strips away the CASE during constant evaluation +UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1; +-- this is not allowed because there're mutable functions in a CaseWhen clause +-- (which we can't easily evaluate on the master) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- make sure we also check defresult (the ELSE clause) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- COALESCE is allowed +UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1; +-- COALESCE is not allowed if there are any mutable functions +UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- RowCompareExpr's are checked for mutability. These are allowed: +ALTER TABLE example DROP value; +ALTER TABLE example ADD value boolean; +ALTER TABLE example ADD time_col timestamptz; +UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1; +UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1; +-- But this RowCompareExpr is not (it passes Var into STABLE) +UPDATE example SET value = NULLIF( + ROW(date '10-10-1000', 2) < ROW(time_col, 3), true +) WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- DistinctExpr's are also checked for mutability. These are allowed: +UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1; +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1; +-- But this RowCompare references the STABLE = (date, timestamptz) operator +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed +UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1; +-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator +UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- CoerceViaIO (typoutput -> typinput, a type coercion) +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date; +-- this one is allowed +UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1; +-- this one is not +UPDATE example SET value = time_col::date WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- ArrayCoerceExpr (applies elemfuncid to each elem) +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date[]; +-- this one is allowed +UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1; +-- this one is not +UPDATE example SET value = array[time_col]::date[] WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- test that UPDATE and DELETE also have the functions in WHERE evaluated +ALTER TABLE example DROP time_col; +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamptz; +INSERT INTO example VALUES (3, now()); +UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + key | value +-----+------------------------------ + 3 | Tue Oct 10 00:00:00 2000 PDT +(1 row) + +DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + key | value +-----+------- +(0 rows) + +DROP TABLE example; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index f3087ed9b..a69e6d80a 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -192,16 +192,15 @@ SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with expressions that cannot be collapsed are unsupported +ERROR: values given for the partition column must be constants or constant expressions +-- values for other columns are totally fine INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +-- commands with mutable but non-volatile functions(ie: stable func.) in their quals +-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) +DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification @@ -433,12 +432,46 @@ UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWE 246 | gm | GM (1 row) --- updates referencing non-IMMUTABLE functions are unsupported -UPDATE limit_orders SET placed_at = now() WHERE id = 246; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ALTER TABLE limit_orders ADD COLUMN array_of_values integer[]; +-- updates referencing STABLE functions are allowed +UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246; +-- so are binary operators +UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246; +CREATE FUNCTION immutable_append(old_values int[], new_value int) +RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; +-- immutable function calls with vars are also allowed +UPDATE limit_orders +SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246; +CREATE FUNCTION stable_append(old_values int[], new_value int) +RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$ +LANGUAGE plpgsql STABLE; +-- but STABLE function calls with vars are not allowed +UPDATE limit_orders +SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +SELECT array_of_values FROM limit_orders WHERE id = 246; + array_of_values +----------------- + {1,2} +(1 row) + +-- STRICT functions work as expected +CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS +'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; +UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246; +ERROR: null value in column "bidder_id" violates not-null constraint +DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}). +CONTEXT: while executing command on localhost:57637 +SELECT array_of_values FROM limit_orders WHERE id = 246; + array_of_values +----------------- + {1,2} +(1 row) + +ALTER TABLE limit_orders DROP array_of_values; -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 0d4e389d6..1394563d3 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -34,13 +34,9 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table ERROR: relation "temporary_nondistributed_table" is not a distributed table -- commands with volatile functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with stable functions in their quals -CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE -- commands with immutable functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); master_modify_multiple_shards @@ -235,9 +231,14 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; 47 (1 row) +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -- updates referencing non-IMMUTABLE functions are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + -- updates referencing IMMUTABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); master_modify_multiple_shards @@ -251,10 +252,21 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; 78 (1 row) --- updates referencing STABLE functions in SET section are not supported +-- updates referencing STABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + -- updates referencing VOLATILE functions in SET section are not supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +-- commands with stable functions in their quals are allowed +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 3b3ff22c1..665f6e65d 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -242,15 +242,15 @@ DETAIL: Subqueries are not supported in distributed modifications. -- non mutable function call in the SET INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = random()::int; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE -- non mutable function call in the WHERE INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE -- non mutable function call in the arbiter WHERE INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int DO UPDATE SET other_col = 5; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE -- error out on attempt to update the partition key INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET part_key = 15; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ed60ff120..5f725e68b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -154,3 +154,8 @@ test: multi_drop_extension # multi_schema_support makes sure we can work with tables in schemas other than public with no problem # ---------- test: multi_schema_support + +# ---------- +# multi_function_evaluation tests edge-cases in master-side function pre-evaluation +# ---------- +test: multi_function_evaluation diff --git a/src/test/regress/sql/multi_function_evaluation.sql b/src/test/regress/sql/multi_function_evaluation.sql new file mode 100644 index 000000000..f31f973ef --- /dev/null +++ b/src/test/regress/sql/multi_function_evaluation.sql @@ -0,0 +1,114 @@ +-- +-- MULTI_FUNCTION_EVALUATION +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; + +-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL) + +CREATE TABLE example (key INT, value INT); +SELECT master_create_distributed_table('example', 'key', 'hash'); +CREATE SEQUENCE example_value_seq; +SELECT master_create_worker_shards('example', 1, 2); +INSERT INTO example VALUES (1, nextval('example_value_seq')); +SELECT * FROM example; + +-- functions called by prepared statements are also evaluated + +PREPARE stmt AS INSERT INTO example VALUES (2); +EXECUTE stmt; +EXECUTE stmt; +SELECT * FROM example; + +-- non-immutable functions inside CASE/COALESCE aren't allowed + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamp; + +-- this is allowed because there are no mutable funcs in the CASE +UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1; + +-- this is allowed because the planner strips away the CASE during constant evaluation +UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1; + +-- this is not allowed because there're mutable functions in a CaseWhen clause +-- (which we can't easily evaluate on the master) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1; + +-- make sure we also check defresult (the ELSE clause) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1; + +-- COALESCE is allowed +UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1; + +-- COALESCE is not allowed if there are any mutable functions +UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1; +UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1; + +-- RowCompareExpr's are checked for mutability. These are allowed: + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value boolean; +ALTER TABLE example ADD time_col timestamptz; + +UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1; +UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1; + +-- But this RowCompareExpr is not (it passes Var into STABLE) + +UPDATE example SET value = NULLIF( + ROW(date '10-10-1000', 2) < ROW(time_col, 3), true +) WHERE key = 1; + +-- DistinctExpr's are also checked for mutability. These are allowed: + +UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1; +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1; + +-- But this RowCompare references the STABLE = (date, timestamptz) operator + +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1; + +-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed + +UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1; + +-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator + +UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1; + +-- CoerceViaIO (typoutput -> typinput, a type coercion) + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date; + +-- this one is allowed +UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1; +-- this one is not +UPDATE example SET value = time_col::date WHERE key = 1; + +-- ArrayCoerceExpr (applies elemfuncid to each elem) + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date[]; + +-- this one is allowed +UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1; +-- this one is not +UPDATE example SET value = array[time_col]::date[] WHERE key = 1; + +-- test that UPDATE and DELETE also have the functions in WHERE evaluated + +ALTER TABLE example DROP time_col; +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamptz; + +INSERT INTO example VALUES (3, now()); +UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + +DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + +DROP TABLE example; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 0ff747cb7..1e3c806f3 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -137,14 +137,15 @@ SET client_min_messages TO DEFAULT; INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); --- commands with expressions that cannot be collapsed are unsupported +-- values for other columns are totally fine INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); --- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; +-- commands with mutable but non-volatile functions(ie: stable func.) in their quals +-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) +DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); @@ -310,8 +311,38 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; -- IMMUTABLE functions are allowed -- even in returning UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; --- updates referencing non-IMMUTABLE functions are unsupported -UPDATE limit_orders SET placed_at = now() WHERE id = 246; +ALTER TABLE limit_orders ADD COLUMN array_of_values integer[]; + +-- updates referencing STABLE functions are allowed +UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246; +-- so are binary operators +UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246; + +CREATE FUNCTION immutable_append(old_values int[], new_value int) +RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; + +-- immutable function calls with vars are also allowed +UPDATE limit_orders +SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246; + +CREATE FUNCTION stable_append(old_values int[], new_value int) +RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$ +LANGUAGE plpgsql STABLE; + +-- but STABLE function calls with vars are not allowed +UPDATE limit_orders +SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246; + +SELECT array_of_values FROM limit_orders WHERE id = 246; + +-- STRICT functions work as expected +CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS +'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; +UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246; + +SELECT array_of_values FROM limit_orders WHERE id = 246; + +ALTER TABLE limit_orders DROP array_of_values; -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql index 65d28a7d3..f8bd8f88c 100644 --- a/src/test/regress/sql/multi_shard_modify.sql +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -60,10 +60,6 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); --- commands with stable functions in their quals -CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); - -- commands with immutable functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); @@ -140,6 +136,8 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10'); SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; + -- updates referencing non-IMMUTABLE functions are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); @@ -147,10 +145,13 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; --- updates referencing STABLE functions in SET section are not supported +-- updates referencing STABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); -- updates referencing VOLATILE functions in SET section are not supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); +-- commands with stable functions in their quals are allowed +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;