From e73b4ac026709eeb8acf93d24e2327abd78749c3 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Tue, 3 May 2016 00:33:08 -0700 Subject: [PATCH 1/6] Evaluate functions on the master - Enables using VOLATILE functions (like nextval()) in INSERT queries - Enables using STABLE functions (like now()) targetLists and joinTrees UPDATE and INSERT can now contain non-immutable functions. INSERT can contain any kind of expression, while UPDATE can contain any STABLE function, so long as a Var is not passed into the STABLE function, even indirectly. UPDATE TagetEntry's can now also include Vars. There's an exception, CASE/COALESCE statements may not contain mutable functions. Functions calls in master_modify_multiple_shards are also evaluated. --- .../distributed/executor/multi_executor.c | 12 +- .../executor/multi_router_executor.c | 49 ++- .../master/master_modify_multiple_shards.c | 3 + .../planner/multi_router_planner.c | 352 +++++++++++++++++- src/backend/distributed/utils/citus_clauses.c | 310 +++++++++++++++ src/include/distributed/citus_clauses.h | 19 + .../distributed/multi_router_executor.h | 3 +- .../expected/multi_function_evaluation.out | 117 ++++++ .../regress/expected/multi_modifications.out | 55 ++- .../regress/expected/multi_shard_modify.out | 32 +- src/test/regress/expected/multi_upsert.out | 6 +- src/test/regress/multi_schedule | 5 + .../regress/sql/multi_function_evaluation.sql | 114 ++++++ src/test/regress/sql/multi_modifications.sql | 41 +- src/test/regress/sql/multi_shard_modify.sql | 11 +- 15 files changed, 1059 insertions(+), 70 deletions(-) create mode 100644 src/backend/distributed/utils/citus_clauses.c create mode 100644 src/include/distributed/citus_clauses.h create mode 100644 src/test/regress/expected/multi_function_evaluation.out create mode 100644 src/test/regress/sql/multi_function_evaluation.sql diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index e831e4583..19ed656c2 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -206,18 +206,8 @@ multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR) { - Task *task = NULL; - PlannedStmt *planStatement = queryDesc->plannedstmt; - MultiPlan *multiPlan = GetMultiPlan(planStatement); - List *taskList = multiPlan->workerJob->taskList; - - /* router executor can only execute distributed plans with a single task */ - Assert(list_length(taskList) == 1); - - task = (Task *) linitial(taskList); - /* drop into the router executor */ - RouterExecutorRun(queryDesc, direction, count, task); + RouterExecutorRun(queryDesc, direction, count); } else { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9e643b00c..78931683d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -19,27 +19,31 @@ #include "miscadmin.h" #include "access/xact.h" +#include "distributed/citus_clauses.h" +#include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" -#include "executor/executor.h" #include "nodes/pg_list.h" +#include "optimizer/clauses.h" #include "utils/builtins.h" - #include "utils/elog.h" #include "utils/errcodes.h" #include "utils/memutils.h" #include "utils/palloc.h" #include "utils/int8.h" +#if (PG_VERSION_NUM >= 90500) +#include "utils/ruleutils.h" +#endif /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; - static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, @@ -49,6 +53,7 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); +static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, TupleDesc tupleDescriptor, int64 *rows); @@ -179,8 +184,12 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) * RouterExecutorRun actually executes a single task on a worker. */ void -RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task) +RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { + PlannedStmt *planStatement = queryDesc->plannedstmt; + MultiPlan *multiPlan = GetMultiPlan(planStatement); + List *taskList = multiPlan->workerJob->taskList; + Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; @@ -188,6 +197,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas MaterialState *routerState = (MaterialState *) queryDesc->planstate; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; + /* router executor can only execute distributed plans with a single task */ + Assert(list_length(taskList) == 1); + task = (Task *) linitial(taskList); + Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); Assert(task != NULL); @@ -309,6 +322,22 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, ListCell *failedPlacementCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; + char *queryString = task->queryString; + + if (isModificationQuery) + { + PlannedStmt *planStatement = queryDesc->plannedstmt; + MultiPlan *multiPlan = GetMultiPlan(planStatement); + Query *query = multiPlan->workerJob->jobQuery; + StringInfo queryStringInfo = makeStringInfo(); + + ExecuteFunctions(query); + DeparseShardQuery(query, task, queryStringInfo); + queryString = queryStringInfo->data; + + elog(DEBUG4, "old query: %s", task->queryString); + elog(DEBUG4, "new query: %s", queryString); + } /* * Try to run the query to completion on one placement. If the query fails @@ -329,7 +358,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, continue; } - queryOK = SendQueryInSingleRowMode(connection, task->queryString); + queryOK = SendQueryInSingleRowMode(connection, queryString); if (!queryOK) { PurgeConnection(connection); @@ -425,6 +454,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } +static void +DeparseShardQuery(Query *query, Task *task, StringInfo queryString) +{ + uint64 shardId = task->anchorShardId; + Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; + + deparse_shard_query(query, relid, shardId, queryString); +} + + /* * ReturnRowsFromTuplestore moves rows from a given tuplestore into a * receiver. It performs the necessary limiting to support cursors. diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 48d82e66e..74221bb42 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -24,6 +24,7 @@ #include "catalog/pg_class.h" #include "commands/dbcommands.h" #include "commands/event_trigger.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" @@ -124,6 +125,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) errmsg("master_modify_multiple_shards() does not support RETURNING"))); } + ExecuteFunctions(modifyQuery); + shardIntervalList = LoadShardIntervalList(relationId); restrictClauseList = WhereClauseList(modifyQuery->jointree); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 5f76b3194..595735ef7 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -51,11 +51,23 @@ #include "utils/errcodes.h" #include "utils/lsyscache.h" #include "utils/rel.h" -#include "utils/relcache.h" -#include "utils/typcache.h" + +#include "catalog/pg_proc.h" +#include "optimizer/planmain.h" + + +typedef struct { + bool containsVar; + bool varArgument; + bool badCoalesce; +} WalkerState; /* planner functions forward declarations */ +static bool ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, + bool *badCoalesce); +static bool ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state); +static char MostPermissiveVolatileFlag(char left, char right); static Task * RouterModifyTask(Query *query); #if (PG_VERSION_NUM >= 90500) static OnConflictExpr * RebuildOnConflict(Oid relationId, @@ -143,8 +155,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) ListCell *rangeTableCell = NULL; bool hasValuesScan = false; uint32 queryTableCount = 0; - bool hasNonConstTargetEntryExprs = false; - bool hasNonConstQualExprs = false; bool specifiesPartitionValue = false; #if (PG_VERSION_NUM >= 90500) ListCell *setTargetCell = NULL; @@ -258,6 +268,8 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { + bool hasVarArgument = false; /* A STABLE function is passed a Var argument */ + bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */ FromExpr *joinTree = NULL; ListCell *targetEntryCell = NULL; @@ -271,9 +283,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) continue; } - if (contain_mutable_functions((Node *) targetEntry->expr)) + if (commandType == CMD_UPDATE && + contain_volatile_functions((Node *) targetEntry->expr)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in UPDATE queries on distributed " + "tables must not be VOLATILE"))); } if (commandType == CMD_UPDATE && @@ -281,17 +296,58 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { specifiesPartitionValue = true; } + + if (targetEntry->resno == partitionColumn->varattno && + !IsA(targetEntry->expr, Const)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("values given for the partition column must be" + " constants or constant expressions"))); + } + + if (commandType == CMD_UPDATE && + ContainsDisallowedFunctionCalls((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } } joinTree = queryTree->jointree; - if (joinTree != NULL && contain_mutable_functions(joinTree->quals)) + if (joinTree != NULL) { - hasNonConstQualExprs = true; + if (contain_volatile_functions(joinTree->quals)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the WHERE clause of modification " + "queries on distributed tables must not be VOLATILE"))); + } + else if (ContainsDisallowedFunctionCalls(joinTree->quals, &hasVarArgument, + &hasBadCoalesce)) + { + Assert(hasVarArgument || hasBadCoalesce); + } + } + + if (hasVarArgument) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("STABLE functions used in UPDATE queries" + " cannot be called with column references"))); + } + + if (hasBadCoalesce) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("non-IMMUTABLE functions are not allowed in CASE or" + " COALESCE statements"))); } if (contain_mutable_functions((Node *) queryTree->returningList)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("non-IMMUTABLE functions are not allowed in the" + " RETURNING clause"))); } } @@ -342,7 +398,9 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } else if (contain_mutable_functions((Node *) setTargetEntry->expr)) { - hasNonConstTargetEntryExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the DO UPDATE SET clause of INSERTs " + "on distributed tables must be marked IMMUTABLE"))); } } } @@ -351,17 +409,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (contain_mutable_functions((Node *) arbiterWhere) || contain_mutable_functions((Node *) onConflictWhere)) { - hasNonConstQualExprs = true; + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("functions used in the WHERE clause of the ON CONFLICT " + "clause of INSERTs on distributed tables must be marked " + "IMMUTABLE"))); } #endif - if (hasNonConstTargetEntryExprs || hasNonConstQualExprs) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in modification queries on distributed " - "tables must be marked IMMUTABLE"))); - } - if (specifiesPartitionValue) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -370,6 +424,268 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } +/* + * If the expression contains STABLE functions which accept any parameters derived from a + * Var returns true and sets varArgument. + * + * If the expression contains a CASE or COALESCE which invoke non-IMMUTABLE functions + * returns true and sets badCoalesce. + * + * Assumes the expression contains no VOLATILE functions. + * + * Var's are allowed, but only if they are passed solely to IMMUTABLE functions + * + * We special-case CASE/COALESCE because those are evaluated lazily. We could evaluate + * CASE/COALESCE expressions which don't reference Vars, or partially evaluate some + * which do, but for now we just error out. That makes both the code and user-education + * easier. + */ +static bool +ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, bool *badCoalesce) +{ + bool result; + WalkerState data; + data.containsVar = data.varArgument = data.badCoalesce = false; + + result = ContainsDisallowedFunctionCallsWalker(expression, &data); + + *varArgument |= data.varArgument; + *badCoalesce |= data.badCoalesce; + return result; +} + + +static bool +ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) +{ + char volatileFlag = 0; + WalkerState childState; + bool containsDisallowedFunction = false; + + childState.containsVar = childState.varArgument = childState.badCoalesce = false; + + if (expression == NULL) + { + return false; + } + + if (IsA(expression, CoalesceExpr)) + { + CoalesceExpr* expr = (CoalesceExpr *) expression; + + if (contain_mutable_functions((Node *) (expr->args))) + { + state->badCoalesce = true; + return true; + } + else + { + /* + * There's no need to recurse. Since there are no STABLE functions + * varArgument will never be set. + */ + return false; + } + } + + if (IsA(expression, CaseExpr)) + { + CaseExpr* expr = (CaseExpr *) expression; + ListCell *temp; + + /* + * contain_mutable_functions doesn't know what to do with CaseWhen so we + * have to break it out ourselves + */ + foreach(temp, expr->args) + { + CaseWhen *when = (CaseWhen *) lfirst(temp); + Assert(IsA(when, CaseWhen)); + + if (contain_mutable_functions((Node *) when->expr) || + contain_mutable_functions((Node *) when->result)) + { + state->badCoalesce = true; + return true; + } + } + + if (contain_mutable_functions((Node *) expr->defresult)) + { + state->badCoalesce = true; + return true; + } + + return ContainsDisallowedFunctionCallsWalker((Node *) (expr->arg), state); + } + + if (IsA(expression, Var)) + { + state->containsVar = true; + return false; + } + + /* + * In order for statement replication to give us consistent results it's important + * that we either disallow or evaluate on the master anything which has a volatility + * category above IMMUTABLE. Newer versions of postgres might add node types which + * should be checked in this function. + * + * Look through contain_mutable_functions_walker or future PG's equivalent for new + * node types before bumping this version number to fix compilation. + * + * Once you've added them to this check, make sure you also evaluate them in the + * executor! + */ + StaticAssertStmt(PG_VERSION_NUM <= 90503, "When porting to a newer PG this section" + " needs to be reviewed."); + + if (IsA(expression, OpExpr)) + { + OpExpr *expr = (OpExpr *) expression; + + set_opfuncid(expr); + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, FuncExpr)) + { + FuncExpr *expr = (FuncExpr *) expression; + + volatileFlag = func_volatile(expr->funcid); + } + else if (IsA(expression, DistinctExpr)) + { + /* + * to exercise this, you need to create a custom type for which the '=' operator + * is STABLE/VOLATILE + */ + DistinctExpr *expr = (DistinctExpr *) expression; + + set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, NullIfExpr)) + { + /* + * same as above, exercising this requires a STABLE/VOLATILE '=' operator + */ + NullIfExpr *expr = (NullIfExpr *) expression; + + set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, ScalarArrayOpExpr)) + { + /* + * to exercise this you need to CREATE OPERATOR with a binary predicate + * and use it within an ANY/ALL clause. + */ + ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression; + + set_sa_opfuncid(expr); + volatileFlag = func_volatile(expr->opfuncid); + } + else if (IsA(expression, CoerceViaIO)) + { + /* + * to exercise this you need to use a type with a STABLE/VOLATILE intype or + * outtype. + */ + CoerceViaIO *expr = (CoerceViaIO *) expression; + Oid iofunc; + Oid typioparam; + bool typisvarlena; + + /* check the result type's input function */ + getTypeInputInfo(expr->resulttype, + &iofunc, &typioparam); + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc)); + + /* check the input type's output function */ + getTypeOutputInfo(exprType((Node *) expr->arg), + &iofunc, &typisvarlena); + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc)); + } + else if (IsA(expression, ArrayCoerceExpr)) + { + ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression; + + if (OidIsValid(expr->elemfuncid)) + { + volatileFlag = func_volatile(expr->elemfuncid); + } + } + else if (IsA(expression, RowCompareExpr)) + { + RowCompareExpr *rcexpr = (RowCompareExpr *) expression; + ListCell *opid; + + foreach(opid, rcexpr->opnos) + { + volatileFlag = MostPermissiveVolatileFlag(volatileFlag, + op_volatile(lfirst_oid(opid))); + } + } + else if (IsA(expression, Query)) + { + /* subqueries aren't allowed and fail before control reaches this point */ + Assert(false); + } + + if (volatileFlag == PROVOLATILE_VOLATILE) + { + /* the caller should have already checked for this */ + Assert(false); + } + else if (volatileFlag == PROVOLATILE_STABLE) + { + containsDisallowedFunction = + expression_tree_walker(expression, + ContainsDisallowedFunctionCallsWalker, + &childState); + + if (childState.containsVar) + { + state->varArgument = true; + } + + state->badCoalesce |= childState.badCoalesce; + state->varArgument |= childState.varArgument; + + return (containsDisallowedFunction || childState.containsVar); + } + + /* keep traversing */ + return expression_tree_walker(expression, + ContainsDisallowedFunctionCallsWalker, + state); +} + + +/* + * Return the most-pessimistic volatility flag of the two params. + * + * for example: given two flags, if one is stable and one is volatile, an expression + * involving both is volatile. + */ +char +MostPermissiveVolatileFlag(char left, char right) +{ + if (left == PROVOLATILE_VOLATILE || right == PROVOLATILE_VOLATILE) + { + return PROVOLATILE_VOLATILE; + } + else if (left == PROVOLATILE_STABLE || right == PROVOLATILE_STABLE) + { + return PROVOLATILE_STABLE; + } + else + { + return PROVOLATILE_IMMUTABLE; + } +} + + /* * RouterModifyTask builds a Task to represent a modification performed by * the provided query against the provided shard interval. This task contains diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c new file mode 100644 index 000000000..338664070 --- /dev/null +++ b/src/backend/distributed/utils/citus_clauses.c @@ -0,0 +1,310 @@ +/* + * citus_clauses.c + * + * Routines roughly equivalent to postgres' util/clauses. + * + * Copyright (c) 2016-2016, Citus Data, Inc. + */ + +#include "postgres.h" + +#include "distributed/citus_clauses.h" + +#include "catalog/pg_type.h" +#include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/primnodes.h" +#include "optimizer/clauses.h" +#include "optimizer/planmain.h" +#include "utils/datum.h" +#include "utils/lsyscache.h" + +static Node * PartiallyEvaluateExpression(Node *expression); +static Node * EvaluateNodeIfReferencesFunction(Node *expression); +static Node * PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar); +static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, + Oid result_collation); + + +/* + * Walks each TargetEntry of the query, evaluates sub-expressions without Vars. + */ +void +ExecuteFunctions(Query *query) +{ + CmdType commandType = query->commandType; + ListCell *targetEntryCell = NULL; + Node *modifiedNode = NULL; + + if (query->jointree && query->jointree->quals) + { + query->jointree->quals = PartiallyEvaluateExpression(query->jointree->quals); + } + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* performance optimization for the most common cases */ + if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) + { + continue; + } + + if (commandType == CMD_INSERT) + { + modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr); + } + else + { + modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr); + } + + targetEntry->expr = (Expr *) modifiedNode; + } + + if(query->jointree) + { + Assert(!contain_mutable_functions((Node *) (query->jointree->quals))); + } + Assert(!contain_mutable_functions((Node *) (query->targetList))); +} + + +/* + * Walks the expression, evaluating any STABLE or IMMUTABLE functions so long as they + * don't reference Vars. + */ +static Node * +PartiallyEvaluateExpression(Node *expression) +{ + bool unused; + return PartiallyEvaluateExpressionWalker(expression, &unused); +} + + +/* + * When you find a function call evaluate it, the planner made sure there were no Vars + * + * Tell the parent whether you are a Var. If your child was a var tell your parent + * + * A little inefficient. It goes to the bottom of the tree then calls EvaluateExpression + * on each function on the way back up. Say we had an expression with no Vars, we could + * only call EvaluateExpression on the top-most level and get the same result. + */ +static Node * +PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar) +{ + bool childContainsVar = false; + Node *copy = NULL; + + if (expression == NULL) + { + return expression; + } + + /* pass any argument lists back to the mutator to copy and recurse for us */ + if (IsA(expression, List)) + { + return expression_tree_mutator(expression, + PartiallyEvaluateExpressionWalker, + containsVar); + } + + if (IsA(expression, Var)) + { + *containsVar = true; + + /* makes a copy for us */ + return expression_tree_mutator(expression, + PartiallyEvaluateExpressionWalker, + containsVar); + } + + copy = expression_tree_mutator(expression, + PartiallyEvaluateExpressionWalker, + &childContainsVar); + + if (childContainsVar) + { + *containsVar = true; + } + else + { + copy = EvaluateNodeIfReferencesFunction(copy); + } + + return copy; +} + + +/* + * Used to evaluate functions during queries on the master before sending them to workers + * + * The idea isn't to evaluate every kind of expression, just the kinds whoes result might + * change between invocations (the idea is to allow users to use functions but still have + * consistent shard replicas, since we use statement replication). This means evaluating + * all nodes which invoke functions which might not be IMMUTABLE. + */ +static Node * +EvaluateNodeIfReferencesFunction(Node *expression) +{ + if (IsA(expression, FuncExpr)) + { + FuncExpr *expr = (FuncExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->funcresulttype, + exprTypmod((Node *) expr), + expr->funccollid); + } + + if (IsA(expression, OpExpr) || + IsA(expression, DistinctExpr) || + IsA(expression, NullIfExpr)) + { + /* structural equivalence */ + OpExpr *expr = (OpExpr *) expression; + + /* typemod is always -1? */ + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->opresulttype, -1, + expr->opcollid); + } + + if (IsA(expression, DistinctExpr)) + { + DistinctExpr *expr = (DistinctExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->opresulttype, + exprTypmod((Node *) expr), + expr->opcollid); + } + + if (IsA(expression, CoerceViaIO)) + { + CoerceViaIO *expr = (CoerceViaIO *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->resulttype, -1, + expr->resultcollid); + } + + if (IsA(expression, ArrayCoerceExpr)) + { + ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, + expr->resulttype, + expr->resulttypmod, + expr->resultcollid); + } + + if (IsA(expression, ScalarArrayOpExpr)) + { + /* TODO: Test this! */ + ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); + } + + if (IsA(expression, RowCompareExpr)) + { + /* TODO: Test this! */ + RowCompareExpr *expr = (RowCompareExpr *) expression; + + return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); + } + + return expression; +} + + +/* + * a copy of pg's evaluate_expr, pre-evaluate a constant expression + * + * We use the executor's routine ExecEvalExpr() to avoid duplication of + * code and ensure we get the same result as the executor would get. + * + * *INDENT-OFF* + */ +static Expr * +citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, + Oid result_collation) +{ + EState *estate; + ExprState *exprstate; + MemoryContext oldcontext; + Datum const_val; + bool const_is_null; + int16 resultTypLen; + bool resultTypByVal; + + /* + * To use the executor, we need an EState. + */ + estate = CreateExecutorState(); + + /* We can use the estate's working context to avoid memory leaks. */ + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* Make sure any opfuncids are filled in. */ + fix_opfuncids((Node *) expr); + + /* + * Prepare expr for execution. (Note: we can't use ExecPrepareExpr + * because it'd result in recursively invoking eval_const_expressions.) + */ + exprstate = ExecInitExpr(expr, NULL); + + /* + * And evaluate it. + * + * It is OK to use a default econtext because none of the ExecEvalExpr() + * code used in this situation will use econtext. That might seem + * fortuitous, but it's not so unreasonable --- a constant expression does + * not depend on context, by definition, n'est ce pas? + */ + const_val = ExecEvalExprSwitchContext(exprstate, + GetPerTupleExprContext(estate), + &const_is_null, NULL); + + /* Get info needed about result datatype */ + get_typlenbyval(result_type, &resultTypLen, &resultTypByVal); + + /* Get back to outer memory context */ + MemoryContextSwitchTo(oldcontext); + + /* + * Must copy result out of sub-context used by expression eval. + * + * Also, if it's varlena, forcibly detoast it. This protects us against + * storing TOAST pointers into plans that might outlive the referenced + * data. (makeConst would handle detoasting anyway, but it's worth a few + * extra lines here so that we can do the copy and detoast in one step.) + */ + if (!const_is_null) + { + if (resultTypLen == -1) + const_val = PointerGetDatum(PG_DETOAST_DATUM_COPY(const_val)); + else + const_val = datumCopy(const_val, resultTypByVal, resultTypLen); + } + + /* Release all the junk we just created */ + FreeExecutorState(estate); + + /* + * Make the constant result node. + */ + return (Expr *) makeConst(result_type, result_typmod, result_collation, + resultTypLen, + const_val, const_is_null, + resultTypByVal); +} + +/* *INDENT-ON* */ diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h new file mode 100644 index 000000000..81c5e4452 --- /dev/null +++ b/src/include/distributed/citus_clauses.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * citus_clauses.h + * Routines roughly equivalent to postgres' util/clauses. + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_NODEFUNCS_H +#define CITUS_NODEFUNCS_H + +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" + +extern void ExecuteFunctions(Query *query); + +#endif /* CITUS_NODEFUNCS_H */ diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index f18af7686..e01540237 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -16,8 +16,7 @@ extern bool AllModificationsCommutative; extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); -extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, - Task *task); +extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); diff --git a/src/test/regress/expected/multi_function_evaluation.out b/src/test/regress/expected/multi_function_evaluation.out new file mode 100644 index 000000000..d451e4ad8 --- /dev/null +++ b/src/test/regress/expected/multi_function_evaluation.out @@ -0,0 +1,117 @@ +-- +-- MULTI_FUNCTION_EVALUATION +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; +-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL) +CREATE TABLE example (key INT, value INT); +SELECT master_create_distributed_table('example', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +CREATE SEQUENCE example_value_seq; +SELECT master_create_worker_shards('example', 1, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO example VALUES (1, nextval('example_value_seq')); +SELECT * FROM example; + key | value +-----+------- + 1 | 1 +(1 row) + +-- functions called by prepared statements are also evaluated +PREPARE stmt AS INSERT INTO example VALUES (2); +EXECUTE stmt; +EXECUTE stmt; +SELECT * FROM example; + key | value +-----+------- + 1 | 1 + 2 | + 2 | +(3 rows) + +-- non-immutable functions inside CASE/COALESCE aren't allowed +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamp; +-- this is allowed because there are no mutable funcs in the CASE +UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1; +-- this is allowed because the planner strips away the CASE during constant evaluation +UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1; +-- this is not allowed because there're mutable functions in a CaseWhen clause +-- (which we can't easily evaluate on the master) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- make sure we also check defresult (the ELSE clause) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- COALESCE is allowed +UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1; +-- COALESCE is not allowed if there are any mutable functions +UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1; +ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements +-- RowCompareExpr's are checked for mutability. These are allowed: +ALTER TABLE example DROP value; +ALTER TABLE example ADD value boolean; +ALTER TABLE example ADD time_col timestamptz; +UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1; +UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1; +-- But this RowCompareExpr is not (it passes Var into STABLE) +UPDATE example SET value = NULLIF( + ROW(date '10-10-1000', 2) < ROW(time_col, 3), true +) WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- DistinctExpr's are also checked for mutability. These are allowed: +UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1; +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1; +-- But this RowCompare references the STABLE = (date, timestamptz) operator +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed +UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1; +-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator +UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- CoerceViaIO (typoutput -> typinput, a type coercion) +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date; +-- this one is allowed +UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1; +-- this one is not +UPDATE example SET value = time_col::date WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- ArrayCoerceExpr (applies elemfuncid to each elem) +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date[]; +-- this one is allowed +UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1; +-- this one is not +UPDATE example SET value = array[time_col]::date[] WHERE key = 1; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +-- test that UPDATE and DELETE also have the functions in WHERE evaluated +ALTER TABLE example DROP time_col; +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamptz; +INSERT INTO example VALUES (3, now()); +UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + key | value +-----+------------------------------ + 3 | Tue Oct 10 00:00:00 2000 PDT +(1 row) + +DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + key | value +-----+------- +(0 rows) + +DROP TABLE example; diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index f3087ed9b..a69e6d80a 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -192,16 +192,15 @@ SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with expressions that cannot be collapsed are unsupported +ERROR: values given for the partition column must be constants or constant expressions +-- values for other columns are totally fine INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE +-- commands with mutable but non-volatile functions(ie: stable func.) in their quals +-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) +DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification @@ -433,12 +432,46 @@ UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWE 246 | gm | GM (1 row) --- updates referencing non-IMMUTABLE functions are unsupported -UPDATE limit_orders SET placed_at = now() WHERE id = 246; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ALTER TABLE limit_orders ADD COLUMN array_of_values integer[]; +-- updates referencing STABLE functions are allowed +UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246; +-- so are binary operators +UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246; +CREATE FUNCTION immutable_append(old_values int[], new_value int) +RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; +-- immutable function calls with vars are also allowed +UPDATE limit_orders +SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246; +CREATE FUNCTION stable_append(old_values int[], new_value int) +RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$ +LANGUAGE plpgsql STABLE; +-- but STABLE function calls with vars are not allowed +UPDATE limit_orders +SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246; +ERROR: STABLE functions used in UPDATE queries cannot be called with column references +SELECT array_of_values FROM limit_orders WHERE id = 246; + array_of_values +----------------- + {1,2} +(1 row) + +-- STRICT functions work as expected +CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS +'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; +UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246; +ERROR: null value in column "bidder_id" violates not-null constraint +DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}). +CONTEXT: while executing command on localhost:57637 +SELECT array_of_values FROM limit_orders WHERE id = 246; + array_of_values +----------------- + {1,2} +(1 row) + +ALTER TABLE limit_orders DROP array_of_values; -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 0d4e389d6..1394563d3 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -34,13 +34,9 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table ERROR: relation "temporary_nondistributed_table" is not a distributed table -- commands with volatile functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE --- commands with stable functions in their quals -CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE -- commands with immutable functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); master_modify_multiple_shards @@ -235,9 +231,14 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; 47 (1 row) +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -- updates referencing non-IMMUTABLE functions are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + -- updates referencing IMMUTABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); master_modify_multiple_shards @@ -251,10 +252,21 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; 78 (1 row) --- updates referencing STABLE functions in SET section are not supported +-- updates referencing STABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + -- updates referencing VOLATILE functions in SET section are not supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE +-- commands with stable functions in their quals are allowed +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 3b3ff22c1..665f6e65d 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -242,15 +242,15 @@ DETAIL: Subqueries are not supported in distributed modifications. -- non mutable function call in the SET INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = random()::int; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE -- non mutable function call in the WHERE INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE -- non mutable function call in the arbiter WHERE INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int DO UPDATE SET other_col = 5; -ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE -- error out on attempt to update the partition key INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET part_key = 15; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ed60ff120..5f725e68b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -154,3 +154,8 @@ test: multi_drop_extension # multi_schema_support makes sure we can work with tables in schemas other than public with no problem # ---------- test: multi_schema_support + +# ---------- +# multi_function_evaluation tests edge-cases in master-side function pre-evaluation +# ---------- +test: multi_function_evaluation diff --git a/src/test/regress/sql/multi_function_evaluation.sql b/src/test/regress/sql/multi_function_evaluation.sql new file mode 100644 index 000000000..f31f973ef --- /dev/null +++ b/src/test/regress/sql/multi_function_evaluation.sql @@ -0,0 +1,114 @@ +-- +-- MULTI_FUNCTION_EVALUATION +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; + +-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL) + +CREATE TABLE example (key INT, value INT); +SELECT master_create_distributed_table('example', 'key', 'hash'); +CREATE SEQUENCE example_value_seq; +SELECT master_create_worker_shards('example', 1, 2); +INSERT INTO example VALUES (1, nextval('example_value_seq')); +SELECT * FROM example; + +-- functions called by prepared statements are also evaluated + +PREPARE stmt AS INSERT INTO example VALUES (2); +EXECUTE stmt; +EXECUTE stmt; +SELECT * FROM example; + +-- non-immutable functions inside CASE/COALESCE aren't allowed + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamp; + +-- this is allowed because there are no mutable funcs in the CASE +UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1; + +-- this is allowed because the planner strips away the CASE during constant evaluation +UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1; + +-- this is not allowed because there're mutable functions in a CaseWhen clause +-- (which we can't easily evaluate on the master) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1; + +-- make sure we also check defresult (the ELSE clause) +UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1; + +-- COALESCE is allowed +UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1; + +-- COALESCE is not allowed if there are any mutable functions +UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1; +UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1; + +-- RowCompareExpr's are checked for mutability. These are allowed: + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value boolean; +ALTER TABLE example ADD time_col timestamptz; + +UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1; +UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1; + +-- But this RowCompareExpr is not (it passes Var into STABLE) + +UPDATE example SET value = NULLIF( + ROW(date '10-10-1000', 2) < ROW(time_col, 3), true +) WHERE key = 1; + +-- DistinctExpr's are also checked for mutability. These are allowed: + +UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1; +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1; + +-- But this RowCompare references the STABLE = (date, timestamptz) operator + +UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1; + +-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed + +UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1; + +-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator + +UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1; + +-- CoerceViaIO (typoutput -> typinput, a type coercion) + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date; + +-- this one is allowed +UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1; +-- this one is not +UPDATE example SET value = time_col::date WHERE key = 1; + +-- ArrayCoerceExpr (applies elemfuncid to each elem) + +ALTER TABLE example DROP value; +ALTER TABLE example ADD value date[]; + +-- this one is allowed +UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1; +-- this one is not +UPDATE example SET value = array[time_col]::date[] WHERE key = 1; + +-- test that UPDATE and DELETE also have the functions in WHERE evaluated + +ALTER TABLE example DROP time_col; +ALTER TABLE example DROP value; +ALTER TABLE example ADD value timestamptz; + +INSERT INTO example VALUES (3, now()); +UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + +DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour'; +SELECT * FROM example WHERE key = 3; + +DROP TABLE example; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 0ff747cb7..1e3c806f3 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -137,14 +137,15 @@ SET client_min_messages TO DEFAULT; INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); --- commands with expressions that cannot be collapsed are unsupported +-- values for other columns are totally fine INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); --- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; +-- commands with mutable but non-volatile functions(ie: stable func.) in their quals +-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) +DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); @@ -310,8 +311,38 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; -- IMMUTABLE functions are allowed -- even in returning UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; --- updates referencing non-IMMUTABLE functions are unsupported -UPDATE limit_orders SET placed_at = now() WHERE id = 246; +ALTER TABLE limit_orders ADD COLUMN array_of_values integer[]; + +-- updates referencing STABLE functions are allowed +UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246; +-- so are binary operators +UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246; + +CREATE FUNCTION immutable_append(old_values int[], new_value int) +RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; + +-- immutable function calls with vars are also allowed +UPDATE limit_orders +SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246; + +CREATE FUNCTION stable_append(old_values int[], new_value int) +RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$ +LANGUAGE plpgsql STABLE; + +-- but STABLE function calls with vars are not allowed +UPDATE limit_orders +SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246; + +SELECT array_of_values FROM limit_orders WHERE id = 246; + +-- STRICT functions work as expected +CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS +'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; +UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246; + +SELECT array_of_values FROM limit_orders WHERE id = 246; + +ALTER TABLE limit_orders DROP array_of_values; -- even in RETURNING UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql index 65d28a7d3..f8bd8f88c 100644 --- a/src/test/regress/sql/multi_shard_modify.sql +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -60,10 +60,6 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); --- commands with stable functions in their quals -CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; -SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); - -- commands with immutable functions in their quals SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); @@ -140,6 +136,8 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10'); SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; +CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE; + -- updates referencing non-IMMUTABLE functions are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); @@ -147,10 +145,13 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; --- updates referencing STABLE functions in SET section are not supported +-- updates referencing STABLE functions in SET section are supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); -- updates referencing VOLATILE functions in SET section are not supported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); +-- commands with stable functions in their quals are allowed +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()'); + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; From d792c0af4d6ed6026351497d9d3245d20cb0f8c4 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Mon, 11 Jul 2016 15:16:44 +0300 Subject: [PATCH 2/6] citus_indent and some renaming --- .../executor/multi_router_executor.c | 3 +- .../master/master_modify_multiple_shards.c | 2 +- .../planner/multi_router_planner.c | 62 +++++++++++-------- src/backend/distributed/utils/citus_clauses.c | 25 ++++---- src/include/distributed/citus_clauses.h | 4 +- 5 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 78931683d..25f3eb65d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -44,6 +44,7 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; + static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, @@ -331,7 +332,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, Query *query = multiPlan->workerJob->jobQuery; StringInfo queryStringInfo = makeStringInfo(); - ExecuteFunctions(query); + ExecuteMasterEvaluableFunctions(query); DeparseShardQuery(query, task, queryStringInfo); queryString = queryStringInfo->data; diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 74221bb42..6c7cf7833 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -125,7 +125,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) errmsg("master_modify_multiple_shards() does not support RETURNING"))); } - ExecuteFunctions(modifyQuery); + ExecuteMasterEvaluableFunctions(modifyQuery); shardIntervalList = LoadShardIntervalList(relationId); restrictClauseList = WhereClauseList(modifyQuery->jointree); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 595735ef7..b43d94587 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -56,7 +56,8 @@ #include "optimizer/planmain.h" -typedef struct { +typedef struct +{ bool containsVar; bool varArgument; bool badCoalesce; @@ -64,9 +65,9 @@ typedef struct { /* planner functions forward declarations */ -static bool ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, +static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); -static bool ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state); +static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static char MostPermissiveVolatileFlag(char left, char right); static Task * RouterModifyTask(Query *query); #if (PG_VERSION_NUM >= 90500) @@ -258,6 +259,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) /* reject queries which involve multi-row inserts */ if (hasValuesScan) { + /* + * if you remove this check you must also change the checks further in this + * method and ensure that VOLATILE function calls aren't allowed in INSERT + * statements. Currently they're allowed, but the function call is replaced + * with a constant, and if you're inserting multiple rows at once the function + * should return a different value for each row. + */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given" " modification"), @@ -288,7 +296,7 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("functions used in UPDATE queries on distributed " - "tables must not be VOLATILE"))); + "tables must not be VOLATILE"))); } if (commandType == CMD_UPDATE && @@ -306,8 +314,8 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) } if (commandType == CMD_UPDATE && - ContainsDisallowedFunctionCalls((Node *) targetEntry->expr, - &hasVarArgument, &hasBadCoalesce)) + MasterIrreducibleExpression((Node *) targetEntry->expr, + &hasVarArgument, &hasBadCoalesce)) { Assert(hasVarArgument || hasBadCoalesce); } @@ -319,11 +327,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) if (contain_volatile_functions(joinTree->quals)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in the WHERE clause of modification " - "queries on distributed tables must not be VOLATILE"))); + errmsg("functions used in the WHERE clause of " + "modification queries on distributed tables " + "must not be VOLATILE"))); } - else if (ContainsDisallowedFunctionCalls(joinTree->quals, &hasVarArgument, - &hasBadCoalesce)) + else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument, + &hasBadCoalesce)) { Assert(hasVarArgument || hasBadCoalesce); } @@ -399,8 +408,9 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) else if (contain_mutable_functions((Node *) setTargetEntry->expr)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("functions used in the DO UPDATE SET clause of INSERTs " - "on distributed tables must be marked IMMUTABLE"))); + errmsg("functions used in the DO UPDATE SET clause of " + "INSERTs on distributed tables must be marked " + "IMMUTABLE"))); } } } @@ -412,7 +422,7 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("functions used in the WHERE clause of the ON CONFLICT " "clause of INSERTs on distributed tables must be marked " - "IMMUTABLE"))); + "IMMUTABLE"))); } #endif @@ -441,13 +451,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) * easier. */ static bool -ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, bool *badCoalesce) +MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce) { bool result; WalkerState data; data.containsVar = data.varArgument = data.badCoalesce = false; - result = ContainsDisallowedFunctionCallsWalker(expression, &data); + result = MasterIrreducibleExpressionWalker(expression, &data); *varArgument |= data.varArgument; *badCoalesce |= data.badCoalesce; @@ -456,7 +466,7 @@ ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, bool *badCo static bool -ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) +MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) { char volatileFlag = 0; WalkerState childState; @@ -471,7 +481,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) if (IsA(expression, CoalesceExpr)) { - CoalesceExpr* expr = (CoalesceExpr *) expression; + CoalesceExpr *expr = (CoalesceExpr *) expression; if (contain_mutable_functions((Node *) (expr->args))) { @@ -490,7 +500,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) if (IsA(expression, CaseExpr)) { - CaseExpr* expr = (CaseExpr *) expression; + CaseExpr *expr = (CaseExpr *) expression; ListCell *temp; /* @@ -516,7 +526,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) return true; } - return ContainsDisallowedFunctionCallsWalker((Node *) (expr->arg), state); + return MasterIrreducibleExpressionWalker((Node *) (expr->arg), state); } if (IsA(expression, Var)) @@ -537,7 +547,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) * Once you've added them to this check, make sure you also evaluate them in the * executor! */ - StaticAssertStmt(PG_VERSION_NUM <= 90503, "When porting to a newer PG this section" + StaticAssertStmt(PG_VERSION_NUM <= 90599, "When porting to a newer PG this section" " needs to be reviewed."); if (IsA(expression, OpExpr)) @@ -641,7 +651,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) { containsDisallowedFunction = expression_tree_walker(expression, - ContainsDisallowedFunctionCallsWalker, + MasterIrreducibleExpressionWalker, &childState); if (childState.containsVar) @@ -657,7 +667,7 @@ ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state) /* keep traversing */ return expression_tree_walker(expression, - ContainsDisallowedFunctionCallsWalker, + MasterIrreducibleExpressionWalker, state); } @@ -932,10 +942,10 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod) /* - * FastShardPruning is a higher level API for FindShardInterval function. Given the relationId - * of the distributed table and partitionValue, FastShardPruning function finds the corresponding - * shard interval that the partitionValue should be in. FastShardPruning returns NULL if no - * ShardIntervals exist for the given partitionValue. + * FastShardPruning is a higher level API for FindShardInterval function. Given the + * relationId of the distributed table and partitionValue, FastShardPruning function finds + * the corresponding shard interval that the partitionValue should be in. FastShardPruning + * returns NULL if no ShardIntervals exist for the given partitionValue. */ static ShardInterval * FastShardPruning(Oid distributedTableId, Const *partitionValue) diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 338664070..780702398 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -1,7 +1,7 @@ /* * citus_clauses.c * - * Routines roughly equivalent to postgres' util/clauses. + * Routines roughly equivalent to postgres' util/clauses. * * Copyright (c) 2016-2016, Citus Data, Inc. */ @@ -23,16 +23,17 @@ static Node * PartiallyEvaluateExpression(Node *expression); static Node * EvaluateNodeIfReferencesFunction(Node *expression); -static Node * PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar); +static Node * PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar); static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, Oid result_collation); /* - * Walks each TargetEntry of the query, evaluates sub-expressions without Vars. + * Looks at each TargetEntry of the query and the jointree quals, evaluating + * any sub-expressions which don't include Vars. */ void -ExecuteFunctions(Query *query) +ExecuteMasterEvaluableFunctions(Query *query) { CmdType commandType = query->commandType; ListCell *targetEntryCell = NULL; @@ -65,7 +66,7 @@ ExecuteFunctions(Query *query) targetEntry->expr = (Expr *) modifiedNode; } - if(query->jointree) + if (query->jointree) { Assert(!contain_mutable_functions((Node *) (query->jointree->quals))); } @@ -81,21 +82,21 @@ static Node * PartiallyEvaluateExpression(Node *expression) { bool unused; - return PartiallyEvaluateExpressionWalker(expression, &unused); + return PartiallyEvaluateExpressionMutator(expression, &unused); } /* - * When you find a function call evaluate it, the planner made sure there were no Vars + * When you find a function call evaluate it, the planner made sure there were no Vars. * - * Tell the parent whether you are a Var. If your child was a var tell your parent + * Tell your parent if either you or one if your children is a Var. * * A little inefficient. It goes to the bottom of the tree then calls EvaluateExpression * on each function on the way back up. Say we had an expression with no Vars, we could * only call EvaluateExpression on the top-most level and get the same result. */ static Node * -PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar) +PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar) { bool childContainsVar = false; Node *copy = NULL; @@ -109,7 +110,7 @@ PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar) if (IsA(expression, List)) { return expression_tree_mutator(expression, - PartiallyEvaluateExpressionWalker, + PartiallyEvaluateExpressionMutator, containsVar); } @@ -119,12 +120,12 @@ PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar) /* makes a copy for us */ return expression_tree_mutator(expression, - PartiallyEvaluateExpressionWalker, + PartiallyEvaluateExpressionMutator, containsVar); } copy = expression_tree_mutator(expression, - PartiallyEvaluateExpressionWalker, + PartiallyEvaluateExpressionMutator, &childContainsVar); if (childContainsVar) diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index 81c5e4452..d6f98edfc 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * citus_clauses.h - * Routines roughly equivalent to postgres' util/clauses. + * Routines roughly equivalent to postgres' util/clauses. * * Copyright (c) 2012-2016, Citus Data, Inc. * @@ -14,6 +14,6 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" -extern void ExecuteFunctions(Query *query); +extern void ExecuteMasterEvaluableFunctions(Query *query); #endif /* CITUS_NODEFUNCS_H */ From c46cb19cda7564f19ff55a2b418cb8ef515f56e0 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Mon, 11 Jul 2016 19:29:13 +0300 Subject: [PATCH 3/6] Only reparse queries if the planner flags them for reparsing --- .../executor/multi_router_executor.c | 2 +- .../planner/multi_router_planner.c | 4 +++ src/backend/distributed/utils/citus_clauses.c | 26 +++++++++++++++++++ .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs_94.c | 1 + .../distributed/utils/citus_readfuncs_95.c | 1 + src/include/distributed/citus_clauses.h | 1 + .../distributed/multi_physical_planner.h | 16 +++++++----- 8 files changed, 45 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 25f3eb65d..21785f26a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -325,7 +325,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery) + if (isModificationQuery && task->requiresMasterEvaluation) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index b43d94587..cb093744d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -21,6 +21,7 @@ #include "access/skey.h" #endif #include "access/xact.h" +#include "distributed/citus_clauses.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -710,6 +711,7 @@ RouterModifyTask(Query *query) StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; + bool requiresMasterEvaluation = RequiresMasterEvaluation(query); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -772,6 +774,7 @@ RouterModifyTask(Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; + modifyTask->requiresMasterEvaluation = requiresMasterEvaluation; return modifyTask; } @@ -1132,6 +1135,7 @@ RouterSelectTask(Query *query) task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; + task->requiresMasterEvaluation = false; return task; } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 780702398..9a93c9160 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -28,6 +28,32 @@ static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typm Oid result_collation); +/* + * Whether the executor needs to reparse and try to execute this query. + */ +bool +RequiresMasterEvaluation(Query *query) +{ + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (contain_mutable_functions((Node *) targetEntry->expr)) + { + return true; + } + } + + if (query->jointree && query->jointree->quals) + { + return contain_mutable_functions((Node *) query->jointree->quals); + } + + return false; +} + /* * Looks at each TargetEntry of the query and the jointree quals, evaluating * any sub-expressions which don't include Vars. diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 2510cc28c..124185ff4 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -458,6 +458,7 @@ _outTask(StringInfo str, const Task *node) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); + WRITE_BOOL_FIELD(requiresMasterEvaluation); } diff --git a/src/backend/distributed/utils/citus_readfuncs_94.c b/src/backend/distributed/utils/citus_readfuncs_94.c index 4be2d093f..d7cb9d61f 100644 --- a/src/backend/distributed/utils/citus_readfuncs_94.c +++ b/src/backend/distributed/utils/citus_readfuncs_94.c @@ -1418,6 +1418,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index 89fc16753..caf223c00 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1507,6 +1507,7 @@ _readTask(void) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index d6f98edfc..acdc3e05a 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -14,6 +14,7 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" +extern bool RequiresMasterEvaluation(Query *query); extern void ExecuteMasterEvaluableFunctions(Query *query); #endif /* CITUS_NODEFUNCS_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ff769d18e..032b8f49f 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -141,6 +141,9 @@ typedef struct MapMergeJob * as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data * fetch tasks. We also forward declare the task execution struct here to avoid * including the executor header files. + * + * NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask + * in citus_readfuncs to correctly (de)serialize this struct. */ typedef struct TaskExecution TaskExecution; @@ -156,12 +159,13 @@ typedef struct Task List *dependedTaskList; /* only applies to compute tasks */ uint32 partitionId; - uint32 upstreamTaskId; /* only applies to data fetch tasks */ - ShardInterval *shardInterval; /* only applies to merge tasks */ - bool assignmentConstrained; /* only applies to merge tasks */ - uint64 shardId; /* only applies to shard fetch tasks */ - TaskExecution *taskExecution; /* used by task tracker executor */ - bool upsertQuery; /* only applies to modify tasks */ + uint32 upstreamTaskId; /* only applies to data fetch tasks */ + ShardInterval *shardInterval; /* only applies to merge tasks */ + bool assignmentConstrained; /* only applies to merge tasks */ + uint64 shardId; /* only applies to shard fetch tasks */ + TaskExecution *taskExecution; /* used by task tracker executor */ + bool upsertQuery; /* only applies to modify tasks */ + bool requiresMasterEvaluation; /* only applies to modify tasks */ } Task; From 9a5e529f6f9366a5cdd3fb94e571ff27fcfd73a5 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Mon, 11 Jul 2016 20:05:47 +0300 Subject: [PATCH 4/6] cosmetic changes --- src/backend/distributed/planner/multi_router_planner.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index cb093744d..be66e6067 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -260,10 +260,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) /* reject queries which involve multi-row inserts */ if (hasValuesScan) { - /* - * if you remove this check you must also change the checks further in this + /* + * NB: If you remove this check you must also change the checks further in this * method and ensure that VOLATILE function calls aren't allowed in INSERT - * statements. Currently they're allowed, but the function call is replaced + * statements. Currently they're allowed but the function call is replaced * with a constant, and if you're inserting multiple rows at once the function * should return a different value for each row. */ @@ -548,8 +548,8 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) * Once you've added them to this check, make sure you also evaluate them in the * executor! */ - StaticAssertStmt(PG_VERSION_NUM <= 90599, "When porting to a newer PG this section" - " needs to be reviewed."); + StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section" + " needs to be reviewed."); if (IsA(expression, OpExpr)) { From 728eefcf2bd35ec91009cde8a726e1e2e5fbe133 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Tue, 12 Jul 2016 11:31:27 +0300 Subject: [PATCH 5/6] Simplify code and fix include guards in citus_clauses --- .../executor/multi_router_executor.c | 4 +-- .../planner/multi_router_planner.c | 30 +++---------------- src/backend/distributed/utils/citus_clauses.c | 18 ++--------- src/include/distributed/citus_clauses.h | 6 ++-- 4 files changed, 11 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 21785f26a..961345a12 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -336,8 +336,8 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, DeparseShardQuery(query, task, queryStringInfo); queryString = queryStringInfo->data; - elog(DEBUG4, "old query: %s", task->queryString); - elog(DEBUG4, "new query: %s", queryString); + elog(DEBUG4, "query before master evaluation: %s", task->queryString); + elog(DEBUG4, "query after master evaluation: %s", queryString); } /* diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index be66e6067..381acd737 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -57,7 +57,7 @@ #include "optimizer/planmain.h" -typedef struct +typedef struct WalkerState { bool containsVar; bool varArgument; @@ -470,11 +470,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) { char volatileFlag = 0; - WalkerState childState; + WalkerState childState = {false, false, false}; bool containsDisallowedFunction = false; - childState.containsVar = childState.varArgument = childState.badCoalesce = false; - if (expression == NULL) { return false; @@ -501,33 +499,13 @@ MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) if (IsA(expression, CaseExpr)) { - CaseExpr *expr = (CaseExpr *) expression; - ListCell *temp; - - /* - * contain_mutable_functions doesn't know what to do with CaseWhen so we - * have to break it out ourselves - */ - foreach(temp, expr->args) - { - CaseWhen *when = (CaseWhen *) lfirst(temp); - Assert(IsA(when, CaseWhen)); - - if (contain_mutable_functions((Node *) when->expr) || - contain_mutable_functions((Node *) when->result)) - { - state->badCoalesce = true; - return true; - } - } - - if (contain_mutable_functions((Node *) expr->defresult)) + if (contain_mutable_functions(expression)) { state->badCoalesce = true; return true; } - return MasterIrreducibleExpressionWalker((Node *) (expr->arg), state); + return false; } if (IsA(expression, Var)) diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 9a93c9160..18a5c7079 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -101,8 +101,8 @@ ExecuteMasterEvaluableFunctions(Query *query) /* - * Walks the expression, evaluating any STABLE or IMMUTABLE functions so long as they - * don't reference Vars. + * Walks the expression evaluating any node which invokes a function as long as a Var + * doesn't show up in the parameter list. */ static Node * PartiallyEvaluateExpression(Node *expression) @@ -195,23 +195,11 @@ EvaluateNodeIfReferencesFunction(Node *expression) /* structural equivalence */ OpExpr *expr = (OpExpr *) expression; - /* typemod is always -1? */ - return (Node *) citus_evaluate_expr((Expr *) expr, expr->opresulttype, -1, expr->opcollid); } - if (IsA(expression, DistinctExpr)) - { - DistinctExpr *expr = (DistinctExpr *) expression; - - return (Node *) citus_evaluate_expr((Expr *) expr, - expr->opresulttype, - exprTypmod((Node *) expr), - expr->opcollid); - } - if (IsA(expression, CoerceViaIO)) { CoerceViaIO *expr = (CoerceViaIO *) expression; @@ -233,7 +221,6 @@ EvaluateNodeIfReferencesFunction(Node *expression) if (IsA(expression, ScalarArrayOpExpr)) { - /* TODO: Test this! */ ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression; return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); @@ -241,7 +228,6 @@ EvaluateNodeIfReferencesFunction(Node *expression) if (IsA(expression, RowCompareExpr)) { - /* TODO: Test this! */ RowCompareExpr *expr = (RowCompareExpr *) expression; return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid); diff --git a/src/include/distributed/citus_clauses.h b/src/include/distributed/citus_clauses.h index acdc3e05a..29905850a 100644 --- a/src/include/distributed/citus_clauses.h +++ b/src/include/distributed/citus_clauses.h @@ -8,8 +8,8 @@ *------------------------------------------------------------------------- */ -#ifndef CITUS_NODEFUNCS_H -#define CITUS_NODEFUNCS_H +#ifndef CITUS_CLAUSES_H +#define CITUS_CLAUSES_H #include "nodes/nodes.h" #include "nodes/parsenodes.h" @@ -17,4 +17,4 @@ extern bool RequiresMasterEvaluation(Query *query); extern void ExecuteMasterEvaluableFunctions(Query *query); -#endif /* CITUS_NODEFUNCS_H */ +#endif /* CITUS_CLAUSES_H */ From bafafcd1bfc8e181b79d4389927f5f02468639ac Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 13 Jul 2016 11:41:44 -0700 Subject: [PATCH 6/6] citus_indent fixups --- src/backend/distributed/planner/multi_router_planner.c | 4 ++-- src/backend/distributed/utils/citus_clauses.c | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 381acd737..14513afec 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -67,7 +67,7 @@ typedef struct WalkerState /* planner functions forward declarations */ static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, - bool *badCoalesce); + bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static char MostPermissiveVolatileFlag(char left, char right); static Task * RouterModifyTask(Query *query); @@ -470,7 +470,7 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state) { char volatileFlag = 0; - WalkerState childState = {false, false, false}; + WalkerState childState = { false, false, false }; bool containsDisallowedFunction = false; if (expression == NULL) diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 18a5c7079..521ad3aa7 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -54,6 +54,7 @@ RequiresMasterEvaluation(Query *query) return false; } + /* * Looks at each TargetEntry of the query and the jointree quals, evaluating * any sub-expressions which don't include Vars.