Merge pull request #488 from citusdata/213-support-stable-functions

Evaluate functions on master
pull/596/head
Andres Freund 2016-07-13 13:42:45 -07:00 committed by GitHub
commit a69a25781a
19 changed files with 1083 additions and 79 deletions

View File

@ -206,18 +206,8 @@ multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR) if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR)
{ {
Task *task = NULL;
PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement);
List *taskList = multiPlan->workerJob->taskList;
/* router executor can only execute distributed plans with a single task */
Assert(list_length(taskList) == 1);
task = (Task *) linitial(taskList);
/* drop into the router executor */ /* drop into the router executor */
RouterExecutorRun(queryDesc, direction, count, task); RouterExecutorRun(queryDesc, direction, count);
} }
else else
{ {

View File

@ -19,21 +19,26 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "access/xact.h" #include "access/xact.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "executor/executor.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "optimizer/clauses.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h" #include "utils/elog.h"
#include "utils/errcodes.h" #include "utils/errcodes.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/palloc.h" #include "utils/palloc.h"
#include "utils/int8.h" #include "utils/int8.h"
#if (PG_VERSION_NUM >= 90500)
#include "utils/ruleutils.h"
#endif
/* controls use of locks to enforce safe commutativity */ /* controls use of locks to enforce safe commutativity */
@ -49,6 +54,7 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, DestReceiver *destination,
Tuplestorestate *tupleStore); Tuplestorestate *tupleStore);
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query); static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows); TupleDesc tupleDescriptor, int64 *rows);
@ -179,8 +185,12 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
* RouterExecutorRun actually executes a single task on a worker. * RouterExecutorRun actually executes a single task on a worker.
*/ */
void void
RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task) RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
{ {
PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement);
List *taskList = multiPlan->workerJob->taskList;
Task *task = NULL;
EState *estate = queryDesc->estate; EState *estate = queryDesc->estate;
CmdType operation = queryDesc->operation; CmdType operation = queryDesc->operation;
MemoryContext oldcontext = NULL; MemoryContext oldcontext = NULL;
@ -188,6 +198,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
MaterialState *routerState = (MaterialState *) queryDesc->planstate; MaterialState *routerState = (MaterialState *) queryDesc->planstate;
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
/* router executor can only execute distributed plans with a single task */
Assert(list_length(taskList) == 1);
task = (Task *) linitial(taskList);
Assert(estate != NULL); Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
Assert(task != NULL); Assert(task != NULL);
@ -309,6 +323,22 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
ListCell *failedPlacementCell = NULL; ListCell *failedPlacementCell = NULL;
int64 affectedTupleCount = -1; int64 affectedTupleCount = -1;
bool gotResults = false; bool gotResults = false;
char *queryString = task->queryString;
if (isModificationQuery && task->requiresMasterEvaluation)
{
PlannedStmt *planStatement = queryDesc->plannedstmt;
MultiPlan *multiPlan = GetMultiPlan(planStatement);
Query *query = multiPlan->workerJob->jobQuery;
StringInfo queryStringInfo = makeStringInfo();
ExecuteMasterEvaluableFunctions(query);
DeparseShardQuery(query, task, queryStringInfo);
queryString = queryStringInfo->data;
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
elog(DEBUG4, "query after master evaluation: %s", queryString);
}
/* /*
* Try to run the query to completion on one placement. If the query fails * Try to run the query to completion on one placement. If the query fails
@ -329,7 +359,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
continue; continue;
} }
queryOK = SendQueryInSingleRowMode(connection, task->queryString); queryOK = SendQueryInSingleRowMode(connection, queryString);
if (!queryOK) if (!queryOK)
{ {
PurgeConnection(connection); PurgeConnection(connection);
@ -425,6 +455,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
} }
static void
DeparseShardQuery(Query *query, Task *task, StringInfo queryString)
{
uint64 shardId = task->anchorShardId;
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
deparse_shard_query(query, relid, shardId, queryString);
}
/* /*
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a * ReturnRowsFromTuplestore moves rows from a given tuplestore into a
* receiver. It performs the necessary limiting to support cursors. * receiver. It performs the necessary limiting to support cursors.

View File

@ -24,6 +24,7 @@
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/event_trigger.h" #include "commands/event_trigger.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
@ -124,6 +125,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
errmsg("master_modify_multiple_shards() does not support RETURNING"))); errmsg("master_modify_multiple_shards() does not support RETURNING")));
} }
ExecuteMasterEvaluableFunctions(modifyQuery);
shardIntervalList = LoadShardIntervalList(relationId); shardIntervalList = LoadShardIntervalList(relationId);
restrictClauseList = WhereClauseList(modifyQuery->jointree); restrictClauseList = WhereClauseList(modifyQuery->jointree);

View File

@ -21,6 +21,7 @@
#include "access/skey.h" #include "access/skey.h"
#endif #endif
#include "access/xact.h" #include "access/xact.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -51,11 +52,24 @@
#include "utils/errcodes.h" #include "utils/errcodes.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/typcache.h" #include "catalog/pg_proc.h"
#include "optimizer/planmain.h"
typedef struct WalkerState
{
bool containsVar;
bool varArgument;
bool badCoalesce;
} WalkerState;
/* planner functions forward declarations */ /* planner functions forward declarations */
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
static char MostPermissiveVolatileFlag(char left, char right);
static Task * RouterModifyTask(Query *query); static Task * RouterModifyTask(Query *query);
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
static OnConflictExpr * RebuildOnConflict(Oid relationId, static OnConflictExpr * RebuildOnConflict(Oid relationId,
@ -143,8 +157,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
bool hasValuesScan = false; bool hasValuesScan = false;
uint32 queryTableCount = 0; uint32 queryTableCount = 0;
bool hasNonConstTargetEntryExprs = false;
bool hasNonConstQualExprs = false;
bool specifiesPartitionValue = false; bool specifiesPartitionValue = false;
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
ListCell *setTargetCell = NULL; ListCell *setTargetCell = NULL;
@ -248,6 +260,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
/* reject queries which involve multi-row inserts */ /* reject queries which involve multi-row inserts */
if (hasValuesScan) if (hasValuesScan)
{ {
/*
* NB: If you remove this check you must also change the checks further in this
* method and ensure that VOLATILE function calls aren't allowed in INSERT
* statements. Currently they're allowed but the function call is replaced
* with a constant, and if you're inserting multiple rows at once the function
* should return a different value for each row.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given" errmsg("cannot perform distributed planning for the given"
" modification"), " modification"),
@ -258,6 +277,8 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
{ {
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
FromExpr *joinTree = NULL; FromExpr *joinTree = NULL;
ListCell *targetEntryCell = NULL; ListCell *targetEntryCell = NULL;
@ -271,9 +292,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
continue; continue;
} }
if (contain_mutable_functions((Node *) targetEntry->expr)) if (commandType == CMD_UPDATE &&
contain_volatile_functions((Node *) targetEntry->expr))
{ {
hasNonConstTargetEntryExprs = true; ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functions used in UPDATE queries on distributed "
"tables must not be VOLATILE")));
} }
if (commandType == CMD_UPDATE && if (commandType == CMD_UPDATE &&
@ -281,17 +305,59 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
{ {
specifiesPartitionValue = true; specifiesPartitionValue = true;
} }
if (targetEntry->resno == partitionColumn->varattno &&
!IsA(targetEntry->expr, Const))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("values given for the partition column must be"
" constants or constant expressions")));
}
if (commandType == CMD_UPDATE &&
MasterIrreducibleExpression((Node *) targetEntry->expr,
&hasVarArgument, &hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
} }
joinTree = queryTree->jointree; joinTree = queryTree->jointree;
if (joinTree != NULL && contain_mutable_functions(joinTree->quals)) if (joinTree != NULL)
{ {
hasNonConstQualExprs = true; if (contain_volatile_functions(joinTree->quals))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functions used in the WHERE clause of "
"modification queries on distributed tables "
"must not be VOLATILE")));
}
else if (MasterIrreducibleExpression(joinTree->quals, &hasVarArgument,
&hasBadCoalesce))
{
Assert(hasVarArgument || hasBadCoalesce);
}
}
if (hasVarArgument)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("STABLE functions used in UPDATE queries"
" cannot be called with column references")));
}
if (hasBadCoalesce)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("non-IMMUTABLE functions are not allowed in CASE or"
" COALESCE statements")));
} }
if (contain_mutable_functions((Node *) queryTree->returningList)) if (contain_mutable_functions((Node *) queryTree->returningList))
{ {
hasNonConstTargetEntryExprs = true; ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("non-IMMUTABLE functions are not allowed in the"
" RETURNING clause")));
} }
} }
@ -342,7 +408,10 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
} }
else if (contain_mutable_functions((Node *) setTargetEntry->expr)) else if (contain_mutable_functions((Node *) setTargetEntry->expr))
{ {
hasNonConstTargetEntryExprs = true; ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functions used in the DO UPDATE SET clause of "
"INSERTs on distributed tables must be marked "
"IMMUTABLE")));
} }
} }
} }
@ -351,17 +420,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
if (contain_mutable_functions((Node *) arbiterWhere) || if (contain_mutable_functions((Node *) arbiterWhere) ||
contain_mutable_functions((Node *) onConflictWhere)) contain_mutable_functions((Node *) onConflictWhere))
{ {
hasNonConstQualExprs = true; ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functions used in the WHERE clause of the ON CONFLICT "
"clause of INSERTs on distributed tables must be marked "
"IMMUTABLE")));
} }
#endif #endif
if (hasNonConstTargetEntryExprs || hasNonConstQualExprs)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("functions used in modification queries on distributed "
"tables must be marked IMMUTABLE")));
}
if (specifiesPartitionValue) if (specifiesPartitionValue)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -370,6 +435,246 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
} }
/*
* If the expression contains STABLE functions which accept any parameters derived from a
* Var returns true and sets varArgument.
*
* If the expression contains a CASE or COALESCE which invoke non-IMMUTABLE functions
* returns true and sets badCoalesce.
*
* Assumes the expression contains no VOLATILE functions.
*
* Var's are allowed, but only if they are passed solely to IMMUTABLE functions
*
* We special-case CASE/COALESCE because those are evaluated lazily. We could evaluate
* CASE/COALESCE expressions which don't reference Vars, or partially evaluate some
* which do, but for now we just error out. That makes both the code and user-education
* easier.
*/
static bool
MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce)
{
bool result;
WalkerState data;
data.containsVar = data.varArgument = data.badCoalesce = false;
result = MasterIrreducibleExpressionWalker(expression, &data);
*varArgument |= data.varArgument;
*badCoalesce |= data.badCoalesce;
return result;
}
static bool
MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state)
{
char volatileFlag = 0;
WalkerState childState = { false, false, false };
bool containsDisallowedFunction = false;
if (expression == NULL)
{
return false;
}
if (IsA(expression, CoalesceExpr))
{
CoalesceExpr *expr = (CoalesceExpr *) expression;
if (contain_mutable_functions((Node *) (expr->args)))
{
state->badCoalesce = true;
return true;
}
else
{
/*
* There's no need to recurse. Since there are no STABLE functions
* varArgument will never be set.
*/
return false;
}
}
if (IsA(expression, CaseExpr))
{
if (contain_mutable_functions(expression))
{
state->badCoalesce = true;
return true;
}
return false;
}
if (IsA(expression, Var))
{
state->containsVar = true;
return false;
}
/*
* In order for statement replication to give us consistent results it's important
* that we either disallow or evaluate on the master anything which has a volatility
* category above IMMUTABLE. Newer versions of postgres might add node types which
* should be checked in this function.
*
* Look through contain_mutable_functions_walker or future PG's equivalent for new
* node types before bumping this version number to fix compilation.
*
* Once you've added them to this check, make sure you also evaluate them in the
* executor!
*/
StaticAssertStmt(PG_VERSION_NUM < 90600, "When porting to a newer PG this section"
" needs to be reviewed.");
if (IsA(expression, OpExpr))
{
OpExpr *expr = (OpExpr *) expression;
set_opfuncid(expr);
volatileFlag = func_volatile(expr->opfuncid);
}
else if (IsA(expression, FuncExpr))
{
FuncExpr *expr = (FuncExpr *) expression;
volatileFlag = func_volatile(expr->funcid);
}
else if (IsA(expression, DistinctExpr))
{
/*
* to exercise this, you need to create a custom type for which the '=' operator
* is STABLE/VOLATILE
*/
DistinctExpr *expr = (DistinctExpr *) expression;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
volatileFlag = func_volatile(expr->opfuncid);
}
else if (IsA(expression, NullIfExpr))
{
/*
* same as above, exercising this requires a STABLE/VOLATILE '=' operator
*/
NullIfExpr *expr = (NullIfExpr *) expression;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
volatileFlag = func_volatile(expr->opfuncid);
}
else if (IsA(expression, ScalarArrayOpExpr))
{
/*
* to exercise this you need to CREATE OPERATOR with a binary predicate
* and use it within an ANY/ALL clause.
*/
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
set_sa_opfuncid(expr);
volatileFlag = func_volatile(expr->opfuncid);
}
else if (IsA(expression, CoerceViaIO))
{
/*
* to exercise this you need to use a type with a STABLE/VOLATILE intype or
* outtype.
*/
CoerceViaIO *expr = (CoerceViaIO *) expression;
Oid iofunc;
Oid typioparam;
bool typisvarlena;
/* check the result type's input function */
getTypeInputInfo(expr->resulttype,
&iofunc, &typioparam);
volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc));
/* check the input type's output function */
getTypeOutputInfo(exprType((Node *) expr->arg),
&iofunc, &typisvarlena);
volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc));
}
else if (IsA(expression, ArrayCoerceExpr))
{
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
if (OidIsValid(expr->elemfuncid))
{
volatileFlag = func_volatile(expr->elemfuncid);
}
}
else if (IsA(expression, RowCompareExpr))
{
RowCompareExpr *rcexpr = (RowCompareExpr *) expression;
ListCell *opid;
foreach(opid, rcexpr->opnos)
{
volatileFlag = MostPermissiveVolatileFlag(volatileFlag,
op_volatile(lfirst_oid(opid)));
}
}
else if (IsA(expression, Query))
{
/* subqueries aren't allowed and fail before control reaches this point */
Assert(false);
}
if (volatileFlag == PROVOLATILE_VOLATILE)
{
/* the caller should have already checked for this */
Assert(false);
}
else if (volatileFlag == PROVOLATILE_STABLE)
{
containsDisallowedFunction =
expression_tree_walker(expression,
MasterIrreducibleExpressionWalker,
&childState);
if (childState.containsVar)
{
state->varArgument = true;
}
state->badCoalesce |= childState.badCoalesce;
state->varArgument |= childState.varArgument;
return (containsDisallowedFunction || childState.containsVar);
}
/* keep traversing */
return expression_tree_walker(expression,
MasterIrreducibleExpressionWalker,
state);
}
/*
* Return the most-pessimistic volatility flag of the two params.
*
* for example: given two flags, if one is stable and one is volatile, an expression
* involving both is volatile.
*/
char
MostPermissiveVolatileFlag(char left, char right)
{
if (left == PROVOLATILE_VOLATILE || right == PROVOLATILE_VOLATILE)
{
return PROVOLATILE_VOLATILE;
}
else if (left == PROVOLATILE_STABLE || right == PROVOLATILE_STABLE)
{
return PROVOLATILE_STABLE;
}
else
{
return PROVOLATILE_IMMUTABLE;
}
}
/* /*
* RouterModifyTask builds a Task to represent a modification performed by * RouterModifyTask builds a Task to represent a modification performed by
* the provided query against the provided shard interval. This task contains * the provided query against the provided shard interval. This task contains
@ -384,6 +689,7 @@ RouterModifyTask(Query *query)
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL; Task *modifyTask = NULL;
bool upsertQuery = false; bool upsertQuery = false;
bool requiresMasterEvaluation = RequiresMasterEvaluation(query);
/* grab shared metadata lock to stop concurrent placement additions */ /* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock); LockShardDistributionMetadata(shardId, ShareLock);
@ -446,6 +752,7 @@ RouterModifyTask(Query *query)
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL; modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery; modifyTask->upsertQuery = upsertQuery;
modifyTask->requiresMasterEvaluation = requiresMasterEvaluation;
return modifyTask; return modifyTask;
} }
@ -616,10 +923,10 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod)
/* /*
* FastShardPruning is a higher level API for FindShardInterval function. Given the relationId * FastShardPruning is a higher level API for FindShardInterval function. Given the
* of the distributed table and partitionValue, FastShardPruning function finds the corresponding * relationId of the distributed table and partitionValue, FastShardPruning function finds
* shard interval that the partitionValue should be in. FastShardPruning returns NULL if no * the corresponding shard interval that the partitionValue should be in. FastShardPruning
* ShardIntervals exist for the given partitionValue. * returns NULL if no ShardIntervals exist for the given partitionValue.
*/ */
static ShardInterval * static ShardInterval *
FastShardPruning(Oid distributedTableId, Const *partitionValue) FastShardPruning(Oid distributedTableId, Const *partitionValue)
@ -806,6 +1113,7 @@ RouterSelectTask(Query *query)
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->dependedTaskList = NIL; task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery; task->upsertQuery = upsertQuery;
task->requiresMasterEvaluation = false;
return task; return task;
} }

View File

@ -0,0 +1,324 @@
/*
* citus_clauses.c
*
* Routines roughly equivalent to postgres' util/clauses.
*
* Copyright (c) 2016-2016, Citus Data, Inc.
*/
#include "postgres.h"
#include "distributed/citus_clauses.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/primnodes.h"
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
static Node * PartiallyEvaluateExpression(Node *expression);
static Node * EvaluateNodeIfReferencesFunction(Node *expression);
static Node * PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar);
static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
Oid result_collation);
/*
* Whether the executor needs to reparse and try to execute this query.
*/
bool
RequiresMasterEvaluation(Query *query)
{
ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
if (contain_mutable_functions((Node *) targetEntry->expr))
{
return true;
}
}
if (query->jointree && query->jointree->quals)
{
return contain_mutable_functions((Node *) query->jointree->quals);
}
return false;
}
/*
* Looks at each TargetEntry of the query and the jointree quals, evaluating
* any sub-expressions which don't include Vars.
*/
void
ExecuteMasterEvaluableFunctions(Query *query)
{
CmdType commandType = query->commandType;
ListCell *targetEntryCell = NULL;
Node *modifiedNode = NULL;
if (query->jointree && query->jointree->quals)
{
query->jointree->quals = PartiallyEvaluateExpression(query->jointree->quals);
}
foreach(targetEntryCell, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
/* performance optimization for the most common cases */
if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var))
{
continue;
}
if (commandType == CMD_INSERT)
{
modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr);
}
else
{
modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr);
}
targetEntry->expr = (Expr *) modifiedNode;
}
if (query->jointree)
{
Assert(!contain_mutable_functions((Node *) (query->jointree->quals)));
}
Assert(!contain_mutable_functions((Node *) (query->targetList)));
}
/*
* Walks the expression evaluating any node which invokes a function as long as a Var
* doesn't show up in the parameter list.
*/
static Node *
PartiallyEvaluateExpression(Node *expression)
{
bool unused;
return PartiallyEvaluateExpressionMutator(expression, &unused);
}
/*
* When you find a function call evaluate it, the planner made sure there were no Vars.
*
* Tell your parent if either you or one if your children is a Var.
*
* A little inefficient. It goes to the bottom of the tree then calls EvaluateExpression
* on each function on the way back up. Say we had an expression with no Vars, we could
* only call EvaluateExpression on the top-most level and get the same result.
*/
static Node *
PartiallyEvaluateExpressionMutator(Node *expression, bool *containsVar)
{
bool childContainsVar = false;
Node *copy = NULL;
if (expression == NULL)
{
return expression;
}
/* pass any argument lists back to the mutator to copy and recurse for us */
if (IsA(expression, List))
{
return expression_tree_mutator(expression,
PartiallyEvaluateExpressionMutator,
containsVar);
}
if (IsA(expression, Var))
{
*containsVar = true;
/* makes a copy for us */
return expression_tree_mutator(expression,
PartiallyEvaluateExpressionMutator,
containsVar);
}
copy = expression_tree_mutator(expression,
PartiallyEvaluateExpressionMutator,
&childContainsVar);
if (childContainsVar)
{
*containsVar = true;
}
else
{
copy = EvaluateNodeIfReferencesFunction(copy);
}
return copy;
}
/*
* Used to evaluate functions during queries on the master before sending them to workers
*
* The idea isn't to evaluate every kind of expression, just the kinds whoes result might
* change between invocations (the idea is to allow users to use functions but still have
* consistent shard replicas, since we use statement replication). This means evaluating
* all nodes which invoke functions which might not be IMMUTABLE.
*/
static Node *
EvaluateNodeIfReferencesFunction(Node *expression)
{
if (IsA(expression, FuncExpr))
{
FuncExpr *expr = (FuncExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->funcresulttype,
exprTypmod((Node *) expr),
expr->funccollid);
}
if (IsA(expression, OpExpr) ||
IsA(expression, DistinctExpr) ||
IsA(expression, NullIfExpr))
{
/* structural equivalence */
OpExpr *expr = (OpExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->opresulttype, -1,
expr->opcollid);
}
if (IsA(expression, CoerceViaIO))
{
CoerceViaIO *expr = (CoerceViaIO *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->resulttype, -1,
expr->resultcollid);
}
if (IsA(expression, ArrayCoerceExpr))
{
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->resulttype,
expr->resulttypmod,
expr->resultcollid);
}
if (IsA(expression, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid);
}
if (IsA(expression, RowCompareExpr))
{
RowCompareExpr *expr = (RowCompareExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid);
}
return expression;
}
/*
* a copy of pg's evaluate_expr, pre-evaluate a constant expression
*
* We use the executor's routine ExecEvalExpr() to avoid duplication of
* code and ensure we get the same result as the executor would get.
*
* *INDENT-OFF*
*/
static Expr *
citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
Oid result_collation)
{
EState *estate;
ExprState *exprstate;
MemoryContext oldcontext;
Datum const_val;
bool const_is_null;
int16 resultTypLen;
bool resultTypByVal;
/*
* To use the executor, we need an EState.
*/
estate = CreateExecutorState();
/* We can use the estate's working context to avoid memory leaks. */
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/* Make sure any opfuncids are filled in. */
fix_opfuncids((Node *) expr);
/*
* Prepare expr for execution. (Note: we can't use ExecPrepareExpr
* because it'd result in recursively invoking eval_const_expressions.)
*/
exprstate = ExecInitExpr(expr, NULL);
/*
* And evaluate it.
*
* It is OK to use a default econtext because none of the ExecEvalExpr()
* code used in this situation will use econtext. That might seem
* fortuitous, but it's not so unreasonable --- a constant expression does
* not depend on context, by definition, n'est ce pas?
*/
const_val = ExecEvalExprSwitchContext(exprstate,
GetPerTupleExprContext(estate),
&const_is_null, NULL);
/* Get info needed about result datatype */
get_typlenbyval(result_type, &resultTypLen, &resultTypByVal);
/* Get back to outer memory context */
MemoryContextSwitchTo(oldcontext);
/*
* Must copy result out of sub-context used by expression eval.
*
* Also, if it's varlena, forcibly detoast it. This protects us against
* storing TOAST pointers into plans that might outlive the referenced
* data. (makeConst would handle detoasting anyway, but it's worth a few
* extra lines here so that we can do the copy and detoast in one step.)
*/
if (!const_is_null)
{
if (resultTypLen == -1)
const_val = PointerGetDatum(PG_DETOAST_DATUM_COPY(const_val));
else
const_val = datumCopy(const_val, resultTypByVal, resultTypLen);
}
/* Release all the junk we just created */
FreeExecutorState(estate);
/*
* Make the constant result node.
*/
return (Expr *) makeConst(result_type, result_typmod, result_collation,
resultTypLen,
const_val, const_is_null,
resultTypByVal);
}
/* *INDENT-ON* */

View File

@ -458,6 +458,7 @@ _outTask(StringInfo str, const Task *node)
WRITE_BOOL_FIELD(assignmentConstrained); WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution); WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery); WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(requiresMasterEvaluation);
} }

View File

@ -1418,6 +1418,7 @@ _readTask(void)
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery); READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE(); READ_DONE();
} }

View File

@ -1507,6 +1507,7 @@ _readTask(void)
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery); READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_DONE(); READ_DONE();
} }

View File

@ -0,0 +1,20 @@
/*-------------------------------------------------------------------------
*
* citus_clauses.h
* Routines roughly equivalent to postgres' util/clauses.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_CLAUSES_H
#define CITUS_CLAUSES_H
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
extern bool RequiresMasterEvaluation(Query *query);
extern void ExecuteMasterEvaluableFunctions(Query *query);
#endif /* CITUS_CLAUSES_H */

View File

@ -141,6 +141,9 @@ typedef struct MapMergeJob
* as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data * as compute tasks; and shard fetch, map fetch, and merge fetch tasks are data
* fetch tasks. We also forward declare the task execution struct here to avoid * fetch tasks. We also forward declare the task execution struct here to avoid
* including the executor header files. * including the executor header files.
*
* NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask
* in citus_readfuncs to correctly (de)serialize this struct.
*/ */
typedef struct TaskExecution TaskExecution; typedef struct TaskExecution TaskExecution;
@ -156,12 +159,13 @@ typedef struct Task
List *dependedTaskList; /* only applies to compute tasks */ List *dependedTaskList; /* only applies to compute tasks */
uint32 partitionId; uint32 partitionId;
uint32 upstreamTaskId; /* only applies to data fetch tasks */ uint32 upstreamTaskId; /* only applies to data fetch tasks */
ShardInterval *shardInterval; /* only applies to merge tasks */ ShardInterval *shardInterval; /* only applies to merge tasks */
bool assignmentConstrained; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */
uint64 shardId; /* only applies to shard fetch tasks */ uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */ bool upsertQuery; /* only applies to modify tasks */
bool requiresMasterEvaluation; /* only applies to modify tasks */
} Task; } Task;

View File

@ -16,8 +16,7 @@ extern bool AllModificationsCommutative;
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
Task *task);
extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);

View File

@ -0,0 +1,117 @@
--
-- MULTI_FUNCTION_EVALUATION
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL)
CREATE TABLE example (key INT, value INT);
SELECT master_create_distributed_table('example', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
CREATE SEQUENCE example_value_seq;
SELECT master_create_worker_shards('example', 1, 2);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO example VALUES (1, nextval('example_value_seq'));
SELECT * FROM example;
key | value
-----+-------
1 | 1
(1 row)
-- functions called by prepared statements are also evaluated
PREPARE stmt AS INSERT INTO example VALUES (2);
EXECUTE stmt;
EXECUTE stmt;
SELECT * FROM example;
key | value
-----+-------
1 | 1
2 |
2 |
(3 rows)
-- non-immutable functions inside CASE/COALESCE aren't allowed
ALTER TABLE example DROP value;
ALTER TABLE example ADD value timestamp;
-- this is allowed because there are no mutable funcs in the CASE
UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1;
-- this is allowed because the planner strips away the CASE during constant evaluation
UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1;
-- this is not allowed because there're mutable functions in a CaseWhen clause
-- (which we can't easily evaluate on the master)
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1;
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
-- make sure we also check defresult (the ELSE clause)
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1;
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
-- COALESCE is allowed
UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1;
-- COALESCE is not allowed if there are any mutable functions
UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1;
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1;
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
-- RowCompareExpr's are checked for mutability. These are allowed:
ALTER TABLE example DROP value;
ALTER TABLE example ADD value boolean;
ALTER TABLE example ADD time_col timestamptz;
UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1;
UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1;
-- But this RowCompareExpr is not (it passes Var into STABLE)
UPDATE example SET value = NULLIF(
ROW(date '10-10-1000', 2) < ROW(time_col, 3), true
) WHERE key = 1;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
-- DistinctExpr's are also checked for mutability. These are allowed:
UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1;
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1;
-- But this RowCompare references the STABLE = (date, timestamptz) operator
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed
UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator
UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
-- CoerceViaIO (typoutput -> typinput, a type coercion)
ALTER TABLE example DROP value;
ALTER TABLE example ADD value date;
-- this one is allowed
UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1;
-- this one is not
UPDATE example SET value = time_col::date WHERE key = 1;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
-- ArrayCoerceExpr (applies elemfuncid to each elem)
ALTER TABLE example DROP value;
ALTER TABLE example ADD value date[];
-- this one is allowed
UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1;
-- this one is not
UPDATE example SET value = array[time_col]::date[] WHERE key = 1;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
-- test that UPDATE and DELETE also have the functions in WHERE evaluated
ALTER TABLE example DROP time_col;
ALTER TABLE example DROP value;
ALTER TABLE example ADD value timestamptz;
INSERT INTO example VALUES (3, now());
UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour';
SELECT * FROM example WHERE key = 3;
key | value
-----+------------------------------
3 | Tue Oct 10 00:00:00 2000 PDT
(1 row)
DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour';
SELECT * FROM example WHERE key = 3;
key | value
-----+-------
(0 rows)
DROP TABLE example;

View File

@ -192,16 +192,15 @@ SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported -- commands with non-constant partition values are unsupported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58); 'sell', 0.58);
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: values given for the partition column must be constants or constant expressions
-- commands with expressions that cannot be collapsed are unsupported -- values for other columns are totally fine
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
-- commands with mutable functions in their quals -- commands with mutable functions in their quals
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
-- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -- commands with mutable but non-volatile functions(ie: stable func.) in their quals
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported -- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification ERROR: cannot perform distributed planning for the given modification
@ -433,12 +432,46 @@ UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWE
246 | gm | GM 246 | gm | GM
(1 row) (1 row)
-- updates referencing non-IMMUTABLE functions are unsupported ALTER TABLE limit_orders ADD COLUMN array_of_values integer[];
UPDATE limit_orders SET placed_at = now() WHERE id = 246; -- updates referencing STABLE functions are allowed
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246;
-- so are binary operators
UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
-- immutable function calls with vars are also allowed
UPDATE limit_orders
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;
CREATE FUNCTION stable_append(old_values int[], new_value int)
RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$
LANGUAGE plpgsql STABLE;
-- but STABLE function calls with vars are not allowed
UPDATE limit_orders
SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246;
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
SELECT array_of_values FROM limit_orders WHERE id = 246;
array_of_values
-----------------
{1,2}
(1 row)
-- STRICT functions work as expected
CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
ERROR: null value in column "bidder_id" violates not-null constraint
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}).
CONTEXT: while executing command on localhost:57637
SELECT array_of_values FROM limit_orders WHERE id = 246;
array_of_values
-----------------
{1,2}
(1 row)
ALTER TABLE limit_orders DROP array_of_values;
-- even in RETURNING -- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause
-- cursors are not supported -- cursors are not supported
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard

View File

@ -34,13 +34,9 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table
ERROR: relation "temporary_nondistributed_table" is not a distributed table ERROR: relation "temporary_nondistributed_table" is not a distributed table
-- commands with volatile functions in their quals -- commands with volatile functions in their quals
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
-- commands with stable functions in their quals
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
-- commands with immutable functions in their quals -- commands with immutable functions in their quals
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
master_modify_multiple_shards master_modify_multiple_shards
@ -235,9 +231,14 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
47 47
(1 row) (1 row)
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
-- updates referencing non-IMMUTABLE functions are unsupported -- updates referencing non-IMMUTABLE functions are unsupported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE master_modify_multiple_shards
-------------------------------
1
(1 row)
-- updates referencing IMMUTABLE functions in SET section are supported -- updates referencing IMMUTABLE functions in SET section are supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
master_modify_multiple_shards master_modify_multiple_shards
@ -251,10 +252,21 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
78 78
(1 row) (1 row)
-- updates referencing STABLE functions in SET section are not supported -- updates referencing STABLE functions in SET section are supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE master_modify_multiple_shards
-------------------------------
1
(1 row)
-- updates referencing VOLATILE functions in SET section are not supported -- updates referencing VOLATILE functions in SET section are not supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
-- commands with stable functions in their quals are allowed
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
master_modify_multiple_shards
-------------------------------
1
(1 row)
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;

View File

@ -242,15 +242,15 @@ DETAIL: Subqueries are not supported in distributed modifications.
-- non mutable function call in the SET -- non mutable function call in the SET
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = random()::int; UPDATE SET other_col = random()::int;
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
-- non mutable function call in the WHERE -- non mutable function call in the WHERE
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int; UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int;
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
-- non mutable function call in the arbiter WHERE -- non mutable function call in the arbiter WHERE
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int
DO UPDATE SET other_col = 5; DO UPDATE SET other_col = 5;
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
-- error out on attempt to update the partition key -- error out on attempt to update the partition key
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET part_key = 15; UPDATE SET part_key = 15;

View File

@ -154,3 +154,8 @@ test: multi_drop_extension
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem # multi_schema_support makes sure we can work with tables in schemas other than public with no problem
# ---------- # ----------
test: multi_schema_support test: multi_schema_support
# ----------
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
# ----------
test: multi_function_evaluation

View File

@ -0,0 +1,114 @@
--
-- MULTI_FUNCTION_EVALUATION
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL)
CREATE TABLE example (key INT, value INT);
SELECT master_create_distributed_table('example', 'key', 'hash');
CREATE SEQUENCE example_value_seq;
SELECT master_create_worker_shards('example', 1, 2);
INSERT INTO example VALUES (1, nextval('example_value_seq'));
SELECT * FROM example;
-- functions called by prepared statements are also evaluated
PREPARE stmt AS INSERT INTO example VALUES (2);
EXECUTE stmt;
EXECUTE stmt;
SELECT * FROM example;
-- non-immutable functions inside CASE/COALESCE aren't allowed
ALTER TABLE example DROP value;
ALTER TABLE example ADD value timestamp;
-- this is allowed because there are no mutable funcs in the CASE
UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1;
-- this is allowed because the planner strips away the CASE during constant evaluation
UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1;
-- this is not allowed because there're mutable functions in a CaseWhen clause
-- (which we can't easily evaluate on the master)
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1;
-- make sure we also check defresult (the ELSE clause)
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1;
-- COALESCE is allowed
UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1;
-- COALESCE is not allowed if there are any mutable functions
UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1;
UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1;
-- RowCompareExpr's are checked for mutability. These are allowed:
ALTER TABLE example DROP value;
ALTER TABLE example ADD value boolean;
ALTER TABLE example ADD time_col timestamptz;
UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1;
UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1;
-- But this RowCompareExpr is not (it passes Var into STABLE)
UPDATE example SET value = NULLIF(
ROW(date '10-10-1000', 2) < ROW(time_col, 3), true
) WHERE key = 1;
-- DistinctExpr's are also checked for mutability. These are allowed:
UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1;
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1;
-- But this RowCompare references the STABLE = (date, timestamptz) operator
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1;
-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed
UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator
UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
-- CoerceViaIO (typoutput -> typinput, a type coercion)
ALTER TABLE example DROP value;
ALTER TABLE example ADD value date;
-- this one is allowed
UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1;
-- this one is not
UPDATE example SET value = time_col::date WHERE key = 1;
-- ArrayCoerceExpr (applies elemfuncid to each elem)
ALTER TABLE example DROP value;
ALTER TABLE example ADD value date[];
-- this one is allowed
UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1;
-- this one is not
UPDATE example SET value = array[time_col]::date[] WHERE key = 1;
-- test that UPDATE and DELETE also have the functions in WHERE evaluated
ALTER TABLE example DROP time_col;
ALTER TABLE example DROP value;
ALTER TABLE example ADD value timestamptz;
INSERT INTO example VALUES (3, now());
UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour';
SELECT * FROM example WHERE key = 3;
DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour';
SELECT * FROM example WHERE key = 3;
DROP TABLE example;

View File

@ -137,14 +137,15 @@ SET client_min_messages TO DEFAULT;
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58); 'sell', 0.58);
-- commands with expressions that cannot be collapsed are unsupported -- values for other columns are totally fine
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
-- commands with mutable functions in their quals -- commands with mutable functions in their quals
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
-- commands with mutable but non-volatilte functions(ie: stable func.) in their quals -- commands with mutable but non-volatile functions(ie: stable func.) in their quals
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported -- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
@ -310,8 +311,38 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
-- IMMUTABLE functions are allowed -- even in returning -- IMMUTABLE functions are allowed -- even in returning
UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol;
-- updates referencing non-IMMUTABLE functions are unsupported ALTER TABLE limit_orders ADD COLUMN array_of_values integer[];
UPDATE limit_orders SET placed_at = now() WHERE id = 246;
-- updates referencing STABLE functions are allowed
UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246;
-- so are binary operators
UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
-- immutable function calls with vars are also allowed
UPDATE limit_orders
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;
CREATE FUNCTION stable_append(old_values int[], new_value int)
RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$
LANGUAGE plpgsql STABLE;
-- but STABLE function calls with vars are not allowed
UPDATE limit_orders
SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246;
SELECT array_of_values FROM limit_orders WHERE id = 246;
-- STRICT functions work as expected
CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
SELECT array_of_values FROM limit_orders WHERE id = 246;
ALTER TABLE limit_orders DROP array_of_values;
-- even in RETURNING -- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();

View File

@ -60,10 +60,6 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
-- commands with stable functions in their quals
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
-- commands with immutable functions in their quals -- commands with immutable functions in their quals
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
@ -140,6 +136,8 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10');
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
-- updates referencing non-IMMUTABLE functions are unsupported -- updates referencing non-IMMUTABLE functions are unsupported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
@ -147,10 +145,13 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10; SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
-- updates referencing STABLE functions in SET section are not supported -- updates referencing STABLE functions in SET section are supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
-- updates referencing VOLATILE functions in SET section are not supported -- updates referencing VOLATILE functions in SET section are not supported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10'); SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
-- commands with stable functions in their quals are allowed
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;