mirror of https://github.com/citusdata/citus.git
Evaluate functions on the master
- Enables using VOLATILE functions (like nextval()) in INSERT queries - Enables using STABLE functions (like now()) targetLists and joinTrees UPDATE and INSERT can now contain non-immutable functions. INSERT can contain any kind of expression, while UPDATE can contain any STABLE function, so long as a Var is not passed into the STABLE function, even indirectly. UPDATE TagetEntry's can now also include Vars. There's an exception, CASE/COALESCE statements may not contain mutable functions. Functions calls in master_modify_multiple_shards are also evaluated.pull/488/head
parent
b01d19db3d
commit
ae91768c96
|
@ -206,18 +206,8 @@ multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
||||||
|
|
||||||
if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR)
|
if (eflags & EXEC_FLAG_CITUS_ROUTER_EXECUTOR)
|
||||||
{
|
{
|
||||||
Task *task = NULL;
|
|
||||||
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
|
||||||
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
|
||||||
List *taskList = multiPlan->workerJob->taskList;
|
|
||||||
|
|
||||||
/* router executor can only execute distributed plans with a single task */
|
|
||||||
Assert(list_length(taskList) == 1);
|
|
||||||
|
|
||||||
task = (Task *) linitial(taskList);
|
|
||||||
|
|
||||||
/* drop into the router executor */
|
/* drop into the router executor */
|
||||||
RouterExecutorRun(queryDesc, direction, count, task);
|
RouterExecutorRun(queryDesc, direction, count);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -19,27 +19,31 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "distributed/citus_clauses.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "executor/executor.h"
|
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
#include "optimizer/clauses.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
|
||||||
#include "utils/elog.h"
|
#include "utils/elog.h"
|
||||||
#include "utils/errcodes.h"
|
#include "utils/errcodes.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
#include "utils/int8.h"
|
#include "utils/int8.h"
|
||||||
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
|
#include "utils/ruleutils.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/* controls use of locks to enforce safe commutativity */
|
/* controls use of locks to enforce safe commutativity */
|
||||||
bool AllModificationsCommutative = false;
|
bool AllModificationsCommutative = false;
|
||||||
|
|
||||||
|
|
||||||
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
||||||
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
||||||
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
|
@ -49,6 +53,7 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||||
DestReceiver *destination,
|
DestReceiver *destination,
|
||||||
Tuplestorestate *tupleStore);
|
Tuplestorestate *tupleStore);
|
||||||
|
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
|
||||||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
|
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
|
||||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
TupleDesc tupleDescriptor, int64 *rows);
|
TupleDesc tupleDescriptor, int64 *rows);
|
||||||
|
@ -179,8 +184,12 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
|
||||||
* RouterExecutorRun actually executes a single task on a worker.
|
* RouterExecutorRun actually executes a single task on a worker.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task)
|
RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
||||||
{
|
{
|
||||||
|
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
||||||
|
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
||||||
|
List *taskList = multiPlan->workerJob->taskList;
|
||||||
|
Task *task = NULL;
|
||||||
EState *estate = queryDesc->estate;
|
EState *estate = queryDesc->estate;
|
||||||
CmdType operation = queryDesc->operation;
|
CmdType operation = queryDesc->operation;
|
||||||
MemoryContext oldcontext = NULL;
|
MemoryContext oldcontext = NULL;
|
||||||
|
@ -188,6 +197,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
|
||||||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||||
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
|
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
|
||||||
|
|
||||||
|
/* router executor can only execute distributed plans with a single task */
|
||||||
|
Assert(list_length(taskList) == 1);
|
||||||
|
task = (Task *) linitial(taskList);
|
||||||
|
|
||||||
Assert(estate != NULL);
|
Assert(estate != NULL);
|
||||||
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
|
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
|
||||||
Assert(task != NULL);
|
Assert(task != NULL);
|
||||||
|
@ -309,6 +322,22 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
ListCell *failedPlacementCell = NULL;
|
ListCell *failedPlacementCell = NULL;
|
||||||
int64 affectedTupleCount = -1;
|
int64 affectedTupleCount = -1;
|
||||||
bool gotResults = false;
|
bool gotResults = false;
|
||||||
|
char *queryString = task->queryString;
|
||||||
|
|
||||||
|
if (isModificationQuery)
|
||||||
|
{
|
||||||
|
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
||||||
|
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
||||||
|
Query *query = multiPlan->workerJob->jobQuery;
|
||||||
|
StringInfo queryStringInfo = makeStringInfo();
|
||||||
|
|
||||||
|
ExecuteFunctions(query);
|
||||||
|
DeparseShardQuery(query, task, queryStringInfo);
|
||||||
|
queryString = queryStringInfo->data;
|
||||||
|
|
||||||
|
elog(DEBUG4, "old query: %s", task->queryString);
|
||||||
|
elog(DEBUG4, "new query: %s", queryString);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to run the query to completion on one placement. If the query fails
|
* Try to run the query to completion on one placement. If the query fails
|
||||||
|
@ -329,7 +358,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
queryOK = SendQueryInSingleRowMode(connection, task->queryString);
|
queryOK = SendQueryInSingleRowMode(connection, queryString);
|
||||||
if (!queryOK)
|
if (!queryOK)
|
||||||
{
|
{
|
||||||
PurgeConnection(connection);
|
PurgeConnection(connection);
|
||||||
|
@ -425,6 +454,16 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
DeparseShardQuery(Query *query, Task *task, StringInfo queryString)
|
||||||
|
{
|
||||||
|
uint64 shardId = task->anchorShardId;
|
||||||
|
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
|
||||||
|
|
||||||
|
deparse_shard_query(query, relid, shardId, queryString);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
|
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
|
||||||
* receiver. It performs the necessary limiting to support cursors.
|
* receiver. It performs the necessary limiting to support cursors.
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/event_trigger.h"
|
#include "commands/event_trigger.h"
|
||||||
|
#include "distributed/citus_clauses.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
@ -124,6 +125,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
errmsg("master_modify_multiple_shards() does not support RETURNING")));
|
errmsg("master_modify_multiple_shards() does not support RETURNING")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ExecuteFunctions(modifyQuery);
|
||||||
|
|
||||||
shardIntervalList = LoadShardIntervalList(relationId);
|
shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
restrictClauseList = WhereClauseList(modifyQuery->jointree);
|
restrictClauseList = WhereClauseList(modifyQuery->jointree);
|
||||||
|
|
||||||
|
|
|
@ -51,11 +51,23 @@
|
||||||
#include "utils/errcodes.h"
|
#include "utils/errcodes.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/relcache.h"
|
|
||||||
#include "utils/typcache.h"
|
#include "catalog/pg_proc.h"
|
||||||
|
#include "optimizer/planmain.h"
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
bool containsVar;
|
||||||
|
bool varArgument;
|
||||||
|
bool badCoalesce;
|
||||||
|
} WalkerState;
|
||||||
|
|
||||||
|
|
||||||
/* planner functions forward declarations */
|
/* planner functions forward declarations */
|
||||||
|
static bool ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument,
|
||||||
|
bool *badCoalesce);
|
||||||
|
static bool ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state);
|
||||||
|
static char MostPermissiveVolatileFlag(char left, char right);
|
||||||
static Task * RouterModifyTask(Query *query);
|
static Task * RouterModifyTask(Query *query);
|
||||||
#if (PG_VERSION_NUM >= 90500)
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
static OnConflictExpr * RebuildOnConflict(Oid relationId,
|
static OnConflictExpr * RebuildOnConflict(Oid relationId,
|
||||||
|
@ -143,8 +155,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
bool hasValuesScan = false;
|
bool hasValuesScan = false;
|
||||||
uint32 queryTableCount = 0;
|
uint32 queryTableCount = 0;
|
||||||
bool hasNonConstTargetEntryExprs = false;
|
|
||||||
bool hasNonConstQualExprs = false;
|
|
||||||
bool specifiesPartitionValue = false;
|
bool specifiesPartitionValue = false;
|
||||||
#if (PG_VERSION_NUM >= 90500)
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
ListCell *setTargetCell = NULL;
|
ListCell *setTargetCell = NULL;
|
||||||
|
@ -258,6 +268,8 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||||
commandType == CMD_DELETE)
|
commandType == CMD_DELETE)
|
||||||
{
|
{
|
||||||
|
bool hasVarArgument = false; /* A STABLE function is passed a Var argument */
|
||||||
|
bool hasBadCoalesce = false; /* CASE/COALESCE passed a mutable function */
|
||||||
FromExpr *joinTree = NULL;
|
FromExpr *joinTree = NULL;
|
||||||
ListCell *targetEntryCell = NULL;
|
ListCell *targetEntryCell = NULL;
|
||||||
|
|
||||||
|
@ -271,9 +283,12 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (contain_mutable_functions((Node *) targetEntry->expr))
|
if (commandType == CMD_UPDATE &&
|
||||||
|
contain_volatile_functions((Node *) targetEntry->expr))
|
||||||
{
|
{
|
||||||
hasNonConstTargetEntryExprs = true;
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("functions used in UPDATE queries on distributed "
|
||||||
|
"tables must not be VOLATILE")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE &&
|
if (commandType == CMD_UPDATE &&
|
||||||
|
@ -281,17 +296,58 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
{
|
{
|
||||||
specifiesPartitionValue = true;
|
specifiesPartitionValue = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (targetEntry->resno == partitionColumn->varattno &&
|
||||||
|
!IsA(targetEntry->expr, Const))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("values given for the partition column must be"
|
||||||
|
" constants or constant expressions")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE &&
|
||||||
|
ContainsDisallowedFunctionCalls((Node *) targetEntry->expr,
|
||||||
|
&hasVarArgument, &hasBadCoalesce))
|
||||||
|
{
|
||||||
|
Assert(hasVarArgument || hasBadCoalesce);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
joinTree = queryTree->jointree;
|
joinTree = queryTree->jointree;
|
||||||
if (joinTree != NULL && contain_mutable_functions(joinTree->quals))
|
if (joinTree != NULL)
|
||||||
{
|
{
|
||||||
hasNonConstQualExprs = true;
|
if (contain_volatile_functions(joinTree->quals))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("functions used in the WHERE clause of modification "
|
||||||
|
"queries on distributed tables must not be VOLATILE")));
|
||||||
|
}
|
||||||
|
else if (ContainsDisallowedFunctionCalls(joinTree->quals, &hasVarArgument,
|
||||||
|
&hasBadCoalesce))
|
||||||
|
{
|
||||||
|
Assert(hasVarArgument || hasBadCoalesce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasVarArgument)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("STABLE functions used in UPDATE queries"
|
||||||
|
" cannot be called with column references")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasBadCoalesce)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("non-IMMUTABLE functions are not allowed in CASE or"
|
||||||
|
" COALESCE statements")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (contain_mutable_functions((Node *) queryTree->returningList))
|
if (contain_mutable_functions((Node *) queryTree->returningList))
|
||||||
{
|
{
|
||||||
hasNonConstTargetEntryExprs = true;
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("non-IMMUTABLE functions are not allowed in the"
|
||||||
|
" RETURNING clause")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,7 +398,9 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
}
|
}
|
||||||
else if (contain_mutable_functions((Node *) setTargetEntry->expr))
|
else if (contain_mutable_functions((Node *) setTargetEntry->expr))
|
||||||
{
|
{
|
||||||
hasNonConstTargetEntryExprs = true;
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("functions used in the DO UPDATE SET clause of INSERTs "
|
||||||
|
"on distributed tables must be marked IMMUTABLE")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -351,17 +409,13 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
if (contain_mutable_functions((Node *) arbiterWhere) ||
|
if (contain_mutable_functions((Node *) arbiterWhere) ||
|
||||||
contain_mutable_functions((Node *) onConflictWhere))
|
contain_mutable_functions((Node *) onConflictWhere))
|
||||||
{
|
{
|
||||||
hasNonConstQualExprs = true;
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("functions used in the WHERE clause of the ON CONFLICT "
|
||||||
|
"clause of INSERTs on distributed tables must be marked "
|
||||||
|
"IMMUTABLE")));
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (hasNonConstTargetEntryExprs || hasNonConstQualExprs)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("functions used in modification queries on distributed "
|
|
||||||
"tables must be marked IMMUTABLE")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (specifiesPartitionValue)
|
if (specifiesPartitionValue)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -370,6 +424,268 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the expression contains STABLE functions which accept any parameters derived from a
|
||||||
|
* Var returns true and sets varArgument.
|
||||||
|
*
|
||||||
|
* If the expression contains a CASE or COALESCE which invoke non-IMMUTABLE functions
|
||||||
|
* returns true and sets badCoalesce.
|
||||||
|
*
|
||||||
|
* Assumes the expression contains no VOLATILE functions.
|
||||||
|
*
|
||||||
|
* Var's are allowed, but only if they are passed solely to IMMUTABLE functions
|
||||||
|
*
|
||||||
|
* We special-case CASE/COALESCE because those are evaluated lazily. We could evaluate
|
||||||
|
* CASE/COALESCE expressions which don't reference Vars, or partially evaluate some
|
||||||
|
* which do, but for now we just error out. That makes both the code and user-education
|
||||||
|
* easier.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ContainsDisallowedFunctionCalls(Node *expression, bool *varArgument, bool *badCoalesce)
|
||||||
|
{
|
||||||
|
bool result;
|
||||||
|
WalkerState data;
|
||||||
|
data.containsVar = data.varArgument = data.badCoalesce = false;
|
||||||
|
|
||||||
|
result = ContainsDisallowedFunctionCallsWalker(expression, &data);
|
||||||
|
|
||||||
|
*varArgument |= data.varArgument;
|
||||||
|
*badCoalesce |= data.badCoalesce;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
ContainsDisallowedFunctionCallsWalker(Node *expression, WalkerState *state)
|
||||||
|
{
|
||||||
|
char volatileFlag = 0;
|
||||||
|
WalkerState childState;
|
||||||
|
bool containsDisallowedFunction = false;
|
||||||
|
|
||||||
|
childState.containsVar = childState.varArgument = childState.badCoalesce = false;
|
||||||
|
|
||||||
|
if (expression == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, CoalesceExpr))
|
||||||
|
{
|
||||||
|
CoalesceExpr* expr = (CoalesceExpr *) expression;
|
||||||
|
|
||||||
|
if (contain_mutable_functions((Node *) (expr->args)))
|
||||||
|
{
|
||||||
|
state->badCoalesce = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* There's no need to recurse. Since there are no STABLE functions
|
||||||
|
* varArgument will never be set.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, CaseExpr))
|
||||||
|
{
|
||||||
|
CaseExpr* expr = (CaseExpr *) expression;
|
||||||
|
ListCell *temp;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* contain_mutable_functions doesn't know what to do with CaseWhen so we
|
||||||
|
* have to break it out ourselves
|
||||||
|
*/
|
||||||
|
foreach(temp, expr->args)
|
||||||
|
{
|
||||||
|
CaseWhen *when = (CaseWhen *) lfirst(temp);
|
||||||
|
Assert(IsA(when, CaseWhen));
|
||||||
|
|
||||||
|
if (contain_mutable_functions((Node *) when->expr) ||
|
||||||
|
contain_mutable_functions((Node *) when->result))
|
||||||
|
{
|
||||||
|
state->badCoalesce = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (contain_mutable_functions((Node *) expr->defresult))
|
||||||
|
{
|
||||||
|
state->badCoalesce = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ContainsDisallowedFunctionCallsWalker((Node *) (expr->arg), state);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, Var))
|
||||||
|
{
|
||||||
|
state->containsVar = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In order for statement replication to give us consistent results it's important
|
||||||
|
* that we either disallow or evaluate on the master anything which has a volatility
|
||||||
|
* category above IMMUTABLE. Newer versions of postgres might add node types which
|
||||||
|
* should be checked in this function.
|
||||||
|
*
|
||||||
|
* Look through contain_mutable_functions_walker or future PG's equivalent for new
|
||||||
|
* node types before bumping this version number to fix compilation.
|
||||||
|
*
|
||||||
|
* Once you've added them to this check, make sure you also evaluate them in the
|
||||||
|
* executor!
|
||||||
|
*/
|
||||||
|
StaticAssertStmt(PG_VERSION_NUM <= 90503, "When porting to a newer PG this section"
|
||||||
|
" needs to be reviewed.");
|
||||||
|
|
||||||
|
if (IsA(expression, OpExpr))
|
||||||
|
{
|
||||||
|
OpExpr *expr = (OpExpr *) expression;
|
||||||
|
|
||||||
|
set_opfuncid(expr);
|
||||||
|
volatileFlag = func_volatile(expr->opfuncid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, FuncExpr))
|
||||||
|
{
|
||||||
|
FuncExpr *expr = (FuncExpr *) expression;
|
||||||
|
|
||||||
|
volatileFlag = func_volatile(expr->funcid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, DistinctExpr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* to exercise this, you need to create a custom type for which the '=' operator
|
||||||
|
* is STABLE/VOLATILE
|
||||||
|
*/
|
||||||
|
DistinctExpr *expr = (DistinctExpr *) expression;
|
||||||
|
|
||||||
|
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
|
||||||
|
volatileFlag = func_volatile(expr->opfuncid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, NullIfExpr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* same as above, exercising this requires a STABLE/VOLATILE '=' operator
|
||||||
|
*/
|
||||||
|
NullIfExpr *expr = (NullIfExpr *) expression;
|
||||||
|
|
||||||
|
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
|
||||||
|
volatileFlag = func_volatile(expr->opfuncid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, ScalarArrayOpExpr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* to exercise this you need to CREATE OPERATOR with a binary predicate
|
||||||
|
* and use it within an ANY/ALL clause.
|
||||||
|
*/
|
||||||
|
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
|
||||||
|
|
||||||
|
set_sa_opfuncid(expr);
|
||||||
|
volatileFlag = func_volatile(expr->opfuncid);
|
||||||
|
}
|
||||||
|
else if (IsA(expression, CoerceViaIO))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* to exercise this you need to use a type with a STABLE/VOLATILE intype or
|
||||||
|
* outtype.
|
||||||
|
*/
|
||||||
|
CoerceViaIO *expr = (CoerceViaIO *) expression;
|
||||||
|
Oid iofunc;
|
||||||
|
Oid typioparam;
|
||||||
|
bool typisvarlena;
|
||||||
|
|
||||||
|
/* check the result type's input function */
|
||||||
|
getTypeInputInfo(expr->resulttype,
|
||||||
|
&iofunc, &typioparam);
|
||||||
|
volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc));
|
||||||
|
|
||||||
|
/* check the input type's output function */
|
||||||
|
getTypeOutputInfo(exprType((Node *) expr->arg),
|
||||||
|
&iofunc, &typisvarlena);
|
||||||
|
volatileFlag = MostPermissiveVolatileFlag(volatileFlag, func_volatile(iofunc));
|
||||||
|
}
|
||||||
|
else if (IsA(expression, ArrayCoerceExpr))
|
||||||
|
{
|
||||||
|
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
|
||||||
|
|
||||||
|
if (OidIsValid(expr->elemfuncid))
|
||||||
|
{
|
||||||
|
volatileFlag = func_volatile(expr->elemfuncid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(expression, RowCompareExpr))
|
||||||
|
{
|
||||||
|
RowCompareExpr *rcexpr = (RowCompareExpr *) expression;
|
||||||
|
ListCell *opid;
|
||||||
|
|
||||||
|
foreach(opid, rcexpr->opnos)
|
||||||
|
{
|
||||||
|
volatileFlag = MostPermissiveVolatileFlag(volatileFlag,
|
||||||
|
op_volatile(lfirst_oid(opid)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(expression, Query))
|
||||||
|
{
|
||||||
|
/* subqueries aren't allowed and fail before control reaches this point */
|
||||||
|
Assert(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (volatileFlag == PROVOLATILE_VOLATILE)
|
||||||
|
{
|
||||||
|
/* the caller should have already checked for this */
|
||||||
|
Assert(false);
|
||||||
|
}
|
||||||
|
else if (volatileFlag == PROVOLATILE_STABLE)
|
||||||
|
{
|
||||||
|
containsDisallowedFunction =
|
||||||
|
expression_tree_walker(expression,
|
||||||
|
ContainsDisallowedFunctionCallsWalker,
|
||||||
|
&childState);
|
||||||
|
|
||||||
|
if (childState.containsVar)
|
||||||
|
{
|
||||||
|
state->varArgument = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
state->badCoalesce |= childState.badCoalesce;
|
||||||
|
state->varArgument |= childState.varArgument;
|
||||||
|
|
||||||
|
return (containsDisallowedFunction || childState.containsVar);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* keep traversing */
|
||||||
|
return expression_tree_walker(expression,
|
||||||
|
ContainsDisallowedFunctionCallsWalker,
|
||||||
|
state);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return the most-pessimistic volatility flag of the two params.
|
||||||
|
*
|
||||||
|
* for example: given two flags, if one is stable and one is volatile, an expression
|
||||||
|
* involving both is volatile.
|
||||||
|
*/
|
||||||
|
char
|
||||||
|
MostPermissiveVolatileFlag(char left, char right)
|
||||||
|
{
|
||||||
|
if (left == PROVOLATILE_VOLATILE || right == PROVOLATILE_VOLATILE)
|
||||||
|
{
|
||||||
|
return PROVOLATILE_VOLATILE;
|
||||||
|
}
|
||||||
|
else if (left == PROVOLATILE_STABLE || right == PROVOLATILE_STABLE)
|
||||||
|
{
|
||||||
|
return PROVOLATILE_STABLE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return PROVOLATILE_IMMUTABLE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterModifyTask builds a Task to represent a modification performed by
|
* RouterModifyTask builds a Task to represent a modification performed by
|
||||||
* the provided query against the provided shard interval. This task contains
|
* the provided query against the provided shard interval. This task contains
|
||||||
|
|
|
@ -0,0 +1,310 @@
|
||||||
|
/*
|
||||||
|
* citus_clauses.c
|
||||||
|
*
|
||||||
|
* Routines roughly equivalent to postgres' util/clauses.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016-2016, Citus Data, Inc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "distributed/citus_clauses.h"
|
||||||
|
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
|
#include "executor/executor.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "nodes/nodeFuncs.h"
|
||||||
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/primnodes.h"
|
||||||
|
#include "optimizer/clauses.h"
|
||||||
|
#include "optimizer/planmain.h"
|
||||||
|
#include "utils/datum.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
static Node * PartiallyEvaluateExpression(Node *expression);
|
||||||
|
static Node * EvaluateNodeIfReferencesFunction(Node *expression);
|
||||||
|
static Node * PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar);
|
||||||
|
static Expr * citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
|
||||||
|
Oid result_collation);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Walks each TargetEntry of the query, evaluates sub-expressions without Vars.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecuteFunctions(Query *query)
|
||||||
|
{
|
||||||
|
CmdType commandType = query->commandType;
|
||||||
|
ListCell *targetEntryCell = NULL;
|
||||||
|
Node *modifiedNode = NULL;
|
||||||
|
|
||||||
|
if (query->jointree && query->jointree->quals)
|
||||||
|
{
|
||||||
|
query->jointree->quals = PartiallyEvaluateExpression(query->jointree->quals);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(targetEntryCell, query->targetList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
|
|
||||||
|
/* performance optimization for the most common cases */
|
||||||
|
if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commandType == CMD_INSERT)
|
||||||
|
{
|
||||||
|
modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr);
|
||||||
|
}
|
||||||
|
|
||||||
|
targetEntry->expr = (Expr *) modifiedNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(query->jointree)
|
||||||
|
{
|
||||||
|
Assert(!contain_mutable_functions((Node *) (query->jointree->quals)));
|
||||||
|
}
|
||||||
|
Assert(!contain_mutable_functions((Node *) (query->targetList)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Walks the expression, evaluating any STABLE or IMMUTABLE functions so long as they
|
||||||
|
* don't reference Vars.
|
||||||
|
*/
|
||||||
|
static Node *
|
||||||
|
PartiallyEvaluateExpression(Node *expression)
|
||||||
|
{
|
||||||
|
bool unused;
|
||||||
|
return PartiallyEvaluateExpressionWalker(expression, &unused);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When you find a function call evaluate it, the planner made sure there were no Vars
|
||||||
|
*
|
||||||
|
* Tell the parent whether you are a Var. If your child was a var tell your parent
|
||||||
|
*
|
||||||
|
* A little inefficient. It goes to the bottom of the tree then calls EvaluateExpression
|
||||||
|
* on each function on the way back up. Say we had an expression with no Vars, we could
|
||||||
|
* only call EvaluateExpression on the top-most level and get the same result.
|
||||||
|
*/
|
||||||
|
static Node *
|
||||||
|
PartiallyEvaluateExpressionWalker(Node *expression, bool *containsVar)
|
||||||
|
{
|
||||||
|
bool childContainsVar = false;
|
||||||
|
Node *copy = NULL;
|
||||||
|
|
||||||
|
if (expression == NULL)
|
||||||
|
{
|
||||||
|
return expression;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pass any argument lists back to the mutator to copy and recurse for us */
|
||||||
|
if (IsA(expression, List))
|
||||||
|
{
|
||||||
|
return expression_tree_mutator(expression,
|
||||||
|
PartiallyEvaluateExpressionWalker,
|
||||||
|
containsVar);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, Var))
|
||||||
|
{
|
||||||
|
*containsVar = true;
|
||||||
|
|
||||||
|
/* makes a copy for us */
|
||||||
|
return expression_tree_mutator(expression,
|
||||||
|
PartiallyEvaluateExpressionWalker,
|
||||||
|
containsVar);
|
||||||
|
}
|
||||||
|
|
||||||
|
copy = expression_tree_mutator(expression,
|
||||||
|
PartiallyEvaluateExpressionWalker,
|
||||||
|
&childContainsVar);
|
||||||
|
|
||||||
|
if (childContainsVar)
|
||||||
|
{
|
||||||
|
*containsVar = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
copy = EvaluateNodeIfReferencesFunction(copy);
|
||||||
|
}
|
||||||
|
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Used to evaluate functions during queries on the master before sending them to workers
|
||||||
|
*
|
||||||
|
* The idea isn't to evaluate every kind of expression, just the kinds whoes result might
|
||||||
|
* change between invocations (the idea is to allow users to use functions but still have
|
||||||
|
* consistent shard replicas, since we use statement replication). This means evaluating
|
||||||
|
* all nodes which invoke functions which might not be IMMUTABLE.
|
||||||
|
*/
|
||||||
|
static Node *
|
||||||
|
EvaluateNodeIfReferencesFunction(Node *expression)
|
||||||
|
{
|
||||||
|
if (IsA(expression, FuncExpr))
|
||||||
|
{
|
||||||
|
FuncExpr *expr = (FuncExpr *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||||
|
expr->funcresulttype,
|
||||||
|
exprTypmod((Node *) expr),
|
||||||
|
expr->funccollid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, OpExpr) ||
|
||||||
|
IsA(expression, DistinctExpr) ||
|
||||||
|
IsA(expression, NullIfExpr))
|
||||||
|
{
|
||||||
|
/* structural equivalence */
|
||||||
|
OpExpr *expr = (OpExpr *) expression;
|
||||||
|
|
||||||
|
/* typemod is always -1? */
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||||
|
expr->opresulttype, -1,
|
||||||
|
expr->opcollid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, DistinctExpr))
|
||||||
|
{
|
||||||
|
DistinctExpr *expr = (DistinctExpr *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||||
|
expr->opresulttype,
|
||||||
|
exprTypmod((Node *) expr),
|
||||||
|
expr->opcollid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, CoerceViaIO))
|
||||||
|
{
|
||||||
|
CoerceViaIO *expr = (CoerceViaIO *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||||
|
expr->resulttype, -1,
|
||||||
|
expr->resultcollid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, ArrayCoerceExpr))
|
||||||
|
{
|
||||||
|
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||||
|
expr->resulttype,
|
||||||
|
expr->resulttypmod,
|
||||||
|
expr->resultcollid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, ScalarArrayOpExpr))
|
||||||
|
{
|
||||||
|
/* TODO: Test this! */
|
||||||
|
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, RowCompareExpr))
|
||||||
|
{
|
||||||
|
/* TODO: Test this! */
|
||||||
|
RowCompareExpr *expr = (RowCompareExpr *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid);
|
||||||
|
}
|
||||||
|
|
||||||
|
return expression;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* a copy of pg's evaluate_expr, pre-evaluate a constant expression
|
||||||
|
*
|
||||||
|
* We use the executor's routine ExecEvalExpr() to avoid duplication of
|
||||||
|
* code and ensure we get the same result as the executor would get.
|
||||||
|
*
|
||||||
|
* *INDENT-OFF*
|
||||||
|
*/
|
||||||
|
static Expr *
|
||||||
|
citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod,
|
||||||
|
Oid result_collation)
|
||||||
|
{
|
||||||
|
EState *estate;
|
||||||
|
ExprState *exprstate;
|
||||||
|
MemoryContext oldcontext;
|
||||||
|
Datum const_val;
|
||||||
|
bool const_is_null;
|
||||||
|
int16 resultTypLen;
|
||||||
|
bool resultTypByVal;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To use the executor, we need an EState.
|
||||||
|
*/
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
|
||||||
|
/* We can use the estate's working context to avoid memory leaks. */
|
||||||
|
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
|
||||||
|
|
||||||
|
/* Make sure any opfuncids are filled in. */
|
||||||
|
fix_opfuncids((Node *) expr);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare expr for execution. (Note: we can't use ExecPrepareExpr
|
||||||
|
* because it'd result in recursively invoking eval_const_expressions.)
|
||||||
|
*/
|
||||||
|
exprstate = ExecInitExpr(expr, NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* And evaluate it.
|
||||||
|
*
|
||||||
|
* It is OK to use a default econtext because none of the ExecEvalExpr()
|
||||||
|
* code used in this situation will use econtext. That might seem
|
||||||
|
* fortuitous, but it's not so unreasonable --- a constant expression does
|
||||||
|
* not depend on context, by definition, n'est ce pas?
|
||||||
|
*/
|
||||||
|
const_val = ExecEvalExprSwitchContext(exprstate,
|
||||||
|
GetPerTupleExprContext(estate),
|
||||||
|
&const_is_null, NULL);
|
||||||
|
|
||||||
|
/* Get info needed about result datatype */
|
||||||
|
get_typlenbyval(result_type, &resultTypLen, &resultTypByVal);
|
||||||
|
|
||||||
|
/* Get back to outer memory context */
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Must copy result out of sub-context used by expression eval.
|
||||||
|
*
|
||||||
|
* Also, if it's varlena, forcibly detoast it. This protects us against
|
||||||
|
* storing TOAST pointers into plans that might outlive the referenced
|
||||||
|
* data. (makeConst would handle detoasting anyway, but it's worth a few
|
||||||
|
* extra lines here so that we can do the copy and detoast in one step.)
|
||||||
|
*/
|
||||||
|
if (!const_is_null)
|
||||||
|
{
|
||||||
|
if (resultTypLen == -1)
|
||||||
|
const_val = PointerGetDatum(PG_DETOAST_DATUM_COPY(const_val));
|
||||||
|
else
|
||||||
|
const_val = datumCopy(const_val, resultTypByVal, resultTypLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release all the junk we just created */
|
||||||
|
FreeExecutorState(estate);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make the constant result node.
|
||||||
|
*/
|
||||||
|
return (Expr *) makeConst(result_type, result_typmod, result_collation,
|
||||||
|
resultTypLen,
|
||||||
|
const_val, const_is_null,
|
||||||
|
resultTypByVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* *INDENT-ON* */
|
|
@ -0,0 +1,19 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* citus_clauses.h
|
||||||
|
* Routines roughly equivalent to postgres' util/clauses.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2012-2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CITUS_NODEFUNCS_H
|
||||||
|
#define CITUS_NODEFUNCS_H
|
||||||
|
|
||||||
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
|
|
||||||
|
extern void ExecuteFunctions(Query *query);
|
||||||
|
|
||||||
|
#endif /* CITUS_NODEFUNCS_H */
|
|
@ -16,8 +16,7 @@ extern bool AllModificationsCommutative;
|
||||||
|
|
||||||
|
|
||||||
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
|
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
|
||||||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count,
|
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||||
Task *task);
|
|
||||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
--
|
||||||
|
-- MULTI_FUNCTION_EVALUATION
|
||||||
|
--
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
|
||||||
|
-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL)
|
||||||
|
CREATE TABLE example (key INT, value INT);
|
||||||
|
SELECT master_create_distributed_table('example', 'key', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE SEQUENCE example_value_seq;
|
||||||
|
SELECT master_create_worker_shards('example', 1, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO example VALUES (1, nextval('example_value_seq'));
|
||||||
|
SELECT * FROM example;
|
||||||
|
key | value
|
||||||
|
-----+-------
|
||||||
|
1 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- functions called by prepared statements are also evaluated
|
||||||
|
PREPARE stmt AS INSERT INTO example VALUES (2);
|
||||||
|
EXECUTE stmt;
|
||||||
|
EXECUTE stmt;
|
||||||
|
SELECT * FROM example;
|
||||||
|
key | value
|
||||||
|
-----+-------
|
||||||
|
1 | 1
|
||||||
|
2 |
|
||||||
|
2 |
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- non-immutable functions inside CASE/COALESCE aren't allowed
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value timestamp;
|
||||||
|
-- this is allowed because there are no mutable funcs in the CASE
|
||||||
|
UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1;
|
||||||
|
-- this is allowed because the planner strips away the CASE during constant evaluation
|
||||||
|
UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1;
|
||||||
|
-- this is not allowed because there're mutable functions in a CaseWhen clause
|
||||||
|
-- (which we can't easily evaluate on the master)
|
||||||
|
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1;
|
||||||
|
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
|
||||||
|
-- make sure we also check defresult (the ELSE clause)
|
||||||
|
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1;
|
||||||
|
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
|
||||||
|
-- COALESCE is allowed
|
||||||
|
UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1;
|
||||||
|
-- COALESCE is not allowed if there are any mutable functions
|
||||||
|
UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1;
|
||||||
|
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
|
||||||
|
UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1;
|
||||||
|
ERROR: non-IMMUTABLE functions are not allowed in CASE or COALESCE statements
|
||||||
|
-- RowCompareExpr's are checked for mutability. These are allowed:
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value boolean;
|
||||||
|
ALTER TABLE example ADD time_col timestamptz;
|
||||||
|
UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1;
|
||||||
|
UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1;
|
||||||
|
-- But this RowCompareExpr is not (it passes Var into STABLE)
|
||||||
|
UPDATE example SET value = NULLIF(
|
||||||
|
ROW(date '10-10-1000', 2) < ROW(time_col, 3), true
|
||||||
|
) WHERE key = 1;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
-- DistinctExpr's are also checked for mutability. These are allowed:
|
||||||
|
UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1;
|
||||||
|
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1;
|
||||||
|
-- But this RowCompare references the STABLE = (date, timestamptz) operator
|
||||||
|
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed
|
||||||
|
UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
|
||||||
|
-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator
|
||||||
|
UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
-- CoerceViaIO (typoutput -> typinput, a type coercion)
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value date;
|
||||||
|
-- this one is allowed
|
||||||
|
UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1;
|
||||||
|
-- this one is not
|
||||||
|
UPDATE example SET value = time_col::date WHERE key = 1;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
-- ArrayCoerceExpr (applies elemfuncid to each elem)
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value date[];
|
||||||
|
-- this one is allowed
|
||||||
|
UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1;
|
||||||
|
-- this one is not
|
||||||
|
UPDATE example SET value = array[time_col]::date[] WHERE key = 1;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
-- test that UPDATE and DELETE also have the functions in WHERE evaluated
|
||||||
|
ALTER TABLE example DROP time_col;
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value timestamptz;
|
||||||
|
INSERT INTO example VALUES (3, now());
|
||||||
|
UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour';
|
||||||
|
SELECT * FROM example WHERE key = 3;
|
||||||
|
key | value
|
||||||
|
-----+------------------------------
|
||||||
|
3 | Tue Oct 10 00:00:00 2000 PDT
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour';
|
||||||
|
SELECT * FROM example WHERE key = 3;
|
||||||
|
key | value
|
||||||
|
-----+-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE example;
|
|
@ -192,16 +192,15 @@ SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are unsupported
|
-- commands with non-constant partition values are unsupported
|
||||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
'sell', 0.58);
|
'sell', 0.58);
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: values given for the partition column must be constants or constant expressions
|
||||||
-- commands with expressions that cannot be collapsed are unsupported
|
-- values for other columns are totally fine
|
||||||
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
||||||
-- commands with mutable but non-volatilte functions(ie: stable func.) in their quals
|
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp;
|
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
||||||
-- commands with multiple rows are unsupported
|
-- commands with multiple rows are unsupported
|
||||||
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
|
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
|
@ -433,12 +432,46 @@ UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWE
|
||||||
246 | gm | GM
|
246 | gm | GM
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- updates referencing non-IMMUTABLE functions are unsupported
|
ALTER TABLE limit_orders ADD COLUMN array_of_values integer[];
|
||||||
UPDATE limit_orders SET placed_at = now() WHERE id = 246;
|
-- updates referencing STABLE functions are allowed
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246;
|
||||||
|
-- so are binary operators
|
||||||
|
UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
|
||||||
|
CREATE FUNCTION immutable_append(old_values int[], new_value int)
|
||||||
|
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
|
||||||
|
-- immutable function calls with vars are also allowed
|
||||||
|
UPDATE limit_orders
|
||||||
|
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;
|
||||||
|
CREATE FUNCTION stable_append(old_values int[], new_value int)
|
||||||
|
RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$
|
||||||
|
LANGUAGE plpgsql STABLE;
|
||||||
|
-- but STABLE function calls with vars are not allowed
|
||||||
|
UPDATE limit_orders
|
||||||
|
SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246;
|
||||||
|
ERROR: STABLE functions used in UPDATE queries cannot be called with column references
|
||||||
|
SELECT array_of_values FROM limit_orders WHERE id = 246;
|
||||||
|
array_of_values
|
||||||
|
-----------------
|
||||||
|
{1,2}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- STRICT functions work as expected
|
||||||
|
CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
|
||||||
|
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
|
||||||
|
UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
|
||||||
|
ERROR: null value in column "bidder_id" violates not-null constraint
|
||||||
|
DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}).
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
SELECT array_of_values FROM limit_orders WHERE id = 246;
|
||||||
|
array_of_values
|
||||||
|
-----------------
|
||||||
|
{1,2}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE limit_orders DROP array_of_values;
|
||||||
-- even in RETURNING
|
-- even in RETURNING
|
||||||
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
|
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause
|
||||||
-- cursors are not supported
|
-- cursors are not supported
|
||||||
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
|
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
|
||||||
ERROR: distributed modifications must target exactly one shard
|
ERROR: distributed modifications must target exactly one shard
|
||||||
|
|
|
@ -34,13 +34,9 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table
|
||||||
ERROR: relation "temporary_nondistributed_table" is not a distributed table
|
ERROR: relation "temporary_nondistributed_table" is not a distributed table
|
||||||
-- commands with volatile functions in their quals
|
-- commands with volatile functions in their quals
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
|
||||||
-- commands with stable functions in their quals
|
|
||||||
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
|
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
|
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
|
||||||
-- commands with immutable functions in their quals
|
-- commands with immutable functions in their quals
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
|
||||||
master_modify_multiple_shards
|
master_modify_multiple_shards
|
||||||
|
@ -235,9 +231,14 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
||||||
47
|
47
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
|
||||||
-- updates referencing non-IMMUTABLE functions are unsupported
|
-- updates referencing non-IMMUTABLE functions are unsupported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- updates referencing IMMUTABLE functions in SET section are supported
|
-- updates referencing IMMUTABLE functions in SET section are supported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
|
||||||
master_modify_multiple_shards
|
master_modify_multiple_shards
|
||||||
|
@ -251,10 +252,21 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
||||||
78
|
78
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- updates referencing STABLE functions in SET section are not supported
|
-- updates referencing STABLE functions in SET section are supported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- updates referencing VOLATILE functions in SET section are not supported
|
-- updates referencing VOLATILE functions in SET section are not supported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
|
||||||
|
-- commands with stable functions in their quals are allowed
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;
|
||||||
|
|
|
@ -242,15 +242,15 @@ DETAIL: Subqueries are not supported in distributed modifications.
|
||||||
-- non mutable function call in the SET
|
-- non mutable function call in the SET
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
||||||
UPDATE SET other_col = random()::int;
|
UPDATE SET other_col = random()::int;
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
|
||||||
-- non mutable function call in the WHERE
|
-- non mutable function call in the WHERE
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
||||||
UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int;
|
UPDATE SET other_col = 5 WHERE upsert_test.other_col = random()::int;
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
|
||||||
-- non mutable function call in the arbiter WHERE
|
-- non mutable function call in the arbiter WHERE
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) WHERE part_key = random()::int
|
||||||
DO UPDATE SET other_col = 5;
|
DO UPDATE SET other_col = 5;
|
||||||
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
|
ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
|
||||||
-- error out on attempt to update the partition key
|
-- error out on attempt to update the partition key
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
||||||
UPDATE SET part_key = 15;
|
UPDATE SET part_key = 15;
|
||||||
|
|
|
@ -154,3 +154,8 @@ test: multi_drop_extension
|
||||||
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem
|
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_schema_support
|
test: multi_schema_support
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
|
||||||
|
# ----------
|
||||||
|
test: multi_function_evaluation
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
--
|
||||||
|
-- MULTI_FUNCTION_EVALUATION
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
|
||||||
|
|
||||||
|
-- nextval() works (no good way to test DEFAULT, or, by extension, SERIAL)
|
||||||
|
|
||||||
|
CREATE TABLE example (key INT, value INT);
|
||||||
|
SELECT master_create_distributed_table('example', 'key', 'hash');
|
||||||
|
CREATE SEQUENCE example_value_seq;
|
||||||
|
SELECT master_create_worker_shards('example', 1, 2);
|
||||||
|
INSERT INTO example VALUES (1, nextval('example_value_seq'));
|
||||||
|
SELECT * FROM example;
|
||||||
|
|
||||||
|
-- functions called by prepared statements are also evaluated
|
||||||
|
|
||||||
|
PREPARE stmt AS INSERT INTO example VALUES (2);
|
||||||
|
EXECUTE stmt;
|
||||||
|
EXECUTE stmt;
|
||||||
|
SELECT * FROM example;
|
||||||
|
|
||||||
|
-- non-immutable functions inside CASE/COALESCE aren't allowed
|
||||||
|
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value timestamp;
|
||||||
|
|
||||||
|
-- this is allowed because there are no mutable funcs in the CASE
|
||||||
|
UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1;
|
||||||
|
|
||||||
|
-- this is allowed because the planner strips away the CASE during constant evaluation
|
||||||
|
UPDATE example SET value = CASE WHEN true THEN now() ELSE now() + interval '1 hour' END WHERE key = 1;
|
||||||
|
|
||||||
|
-- this is not allowed because there're mutable functions in a CaseWhen clause
|
||||||
|
-- (which we can't easily evaluate on the master)
|
||||||
|
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN now() ELSE timestamp '10-24-1190' END) WHERE key = 1;
|
||||||
|
|
||||||
|
-- make sure we also check defresult (the ELSE clause)
|
||||||
|
UPDATE example SET value = (CASE WHEN now() > timestamp '12-12-1991' THEN timestamp '12-12-1191' ELSE now() END) WHERE key = 1;
|
||||||
|
|
||||||
|
-- COALESCE is allowed
|
||||||
|
UPDATE example SET value = COALESCE(null, null, timestamp '10-10-1000') WHERE key = 1;
|
||||||
|
|
||||||
|
-- COALESCE is not allowed if there are any mutable functions
|
||||||
|
UPDATE example SET value = COALESCE(now(), timestamp '10-10-1000') WHERE key = 1;
|
||||||
|
UPDATE example SET value = COALESCE(timestamp '10-10-1000', now()) WHERE key = 1;
|
||||||
|
|
||||||
|
-- RowCompareExpr's are checked for mutability. These are allowed:
|
||||||
|
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value boolean;
|
||||||
|
ALTER TABLE example ADD time_col timestamptz;
|
||||||
|
|
||||||
|
UPDATE example SET value = NULLIF(ROW(1, 2) < ROW(2, 3), true) WHERE key = 1;
|
||||||
|
UPDATE example SET value = NULLIF(ROW(true, 2) < ROW(value, 3), true) WHERE key = 1;
|
||||||
|
|
||||||
|
-- But this RowCompareExpr is not (it passes Var into STABLE)
|
||||||
|
|
||||||
|
UPDATE example SET value = NULLIF(
|
||||||
|
ROW(date '10-10-1000', 2) < ROW(time_col, 3), true
|
||||||
|
) WHERE key = 1;
|
||||||
|
|
||||||
|
-- DistinctExpr's are also checked for mutability. These are allowed:
|
||||||
|
|
||||||
|
UPDATE example SET value = 1 IS DISTINCT FROM 2 WHERE key = 1;
|
||||||
|
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM timestamptz '10-10-1000' WHERE key = 1;
|
||||||
|
|
||||||
|
-- But this RowCompare references the STABLE = (date, timestamptz) operator
|
||||||
|
|
||||||
|
UPDATE example SET value = date '10-10-1000' IS DISTINCT FROM time_col WHERE key = 1;
|
||||||
|
|
||||||
|
-- this ScalarArrayOpExpr ("scalar op ANY/ALL (array)") is allowed
|
||||||
|
|
||||||
|
UPDATE example SET value = date '10-10-1000' = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
|
||||||
|
|
||||||
|
-- this ScalarArrayOpExpr is not, it invokes the STABLE = (timestamptz, date) operator
|
||||||
|
|
||||||
|
UPDATE example SET value = time_col = ANY ('{10-10-1000}'::date[]) WHERE key = 1;
|
||||||
|
|
||||||
|
-- CoerceViaIO (typoutput -> typinput, a type coercion)
|
||||||
|
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value date;
|
||||||
|
|
||||||
|
-- this one is allowed
|
||||||
|
UPDATE example SET value = (timestamp '10-19-2000 13:29')::date WHERE key = 1;
|
||||||
|
-- this one is not
|
||||||
|
UPDATE example SET value = time_col::date WHERE key = 1;
|
||||||
|
|
||||||
|
-- ArrayCoerceExpr (applies elemfuncid to each elem)
|
||||||
|
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value date[];
|
||||||
|
|
||||||
|
-- this one is allowed
|
||||||
|
UPDATE example SET value = array[timestamptz '10-20-2013 10:20']::date[] WHERE key = 1;
|
||||||
|
-- this one is not
|
||||||
|
UPDATE example SET value = array[time_col]::date[] WHERE key = 1;
|
||||||
|
|
||||||
|
-- test that UPDATE and DELETE also have the functions in WHERE evaluated
|
||||||
|
|
||||||
|
ALTER TABLE example DROP time_col;
|
||||||
|
ALTER TABLE example DROP value;
|
||||||
|
ALTER TABLE example ADD value timestamptz;
|
||||||
|
|
||||||
|
INSERT INTO example VALUES (3, now());
|
||||||
|
UPDATE example SET value = timestamp '10-10-2000 00:00' WHERE key = 3 AND value > now() - interval '1 hour';
|
||||||
|
SELECT * FROM example WHERE key = 3;
|
||||||
|
|
||||||
|
DELETE FROM example WHERE key = 3 AND value < now() - interval '1 hour';
|
||||||
|
SELECT * FROM example WHERE key = 3;
|
||||||
|
|
||||||
|
DROP TABLE example;
|
|
@ -137,14 +137,15 @@ SET client_min_messages TO DEFAULT;
|
||||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
'sell', 0.58);
|
'sell', 0.58);
|
||||||
|
|
||||||
-- commands with expressions that cannot be collapsed are unsupported
|
-- values for other columns are totally fine
|
||||||
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
|
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
|
||||||
|
|
||||||
-- commands with mutable but non-volatilte functions(ie: stable func.) in their quals
|
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
|
||||||
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp;
|
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
|
||||||
|
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
|
||||||
|
|
||||||
-- commands with multiple rows are unsupported
|
-- commands with multiple rows are unsupported
|
||||||
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
|
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
|
||||||
|
@ -310,8 +311,38 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
|
||||||
-- IMMUTABLE functions are allowed -- even in returning
|
-- IMMUTABLE functions are allowed -- even in returning
|
||||||
UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol;
|
UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol;
|
||||||
|
|
||||||
-- updates referencing non-IMMUTABLE functions are unsupported
|
ALTER TABLE limit_orders ADD COLUMN array_of_values integer[];
|
||||||
UPDATE limit_orders SET placed_at = now() WHERE id = 246;
|
|
||||||
|
-- updates referencing STABLE functions are allowed
|
||||||
|
UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246;
|
||||||
|
-- so are binary operators
|
||||||
|
UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
|
||||||
|
|
||||||
|
CREATE FUNCTION immutable_append(old_values int[], new_value int)
|
||||||
|
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
|
||||||
|
|
||||||
|
-- immutable function calls with vars are also allowed
|
||||||
|
UPDATE limit_orders
|
||||||
|
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;
|
||||||
|
|
||||||
|
CREATE FUNCTION stable_append(old_values int[], new_value int)
|
||||||
|
RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$
|
||||||
|
LANGUAGE plpgsql STABLE;
|
||||||
|
|
||||||
|
-- but STABLE function calls with vars are not allowed
|
||||||
|
UPDATE limit_orders
|
||||||
|
SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246;
|
||||||
|
|
||||||
|
SELECT array_of_values FROM limit_orders WHERE id = 246;
|
||||||
|
|
||||||
|
-- STRICT functions work as expected
|
||||||
|
CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS
|
||||||
|
'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT;
|
||||||
|
UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246;
|
||||||
|
|
||||||
|
SELECT array_of_values FROM limit_orders WHERE id = 246;
|
||||||
|
|
||||||
|
ALTER TABLE limit_orders DROP array_of_values;
|
||||||
|
|
||||||
-- even in RETURNING
|
-- even in RETURNING
|
||||||
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
|
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
|
||||||
|
|
|
@ -60,10 +60,6 @@ SELECT master_modify_multiple_shards('DELETE FROM temporary_nondistributed_table
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (random() * 1000)');
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_value = (random() * 1000)');
|
||||||
|
|
||||||
-- commands with stable functions in their quals
|
|
||||||
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
|
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
|
|
||||||
|
|
||||||
-- commands with immutable functions in their quals
|
-- commands with immutable functions in their quals
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = abs(-3)');
|
||||||
|
|
||||||
|
@ -140,6 +136,8 @@ SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = t_value + 37 WHERE t_key = 10');
|
||||||
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
||||||
|
|
||||||
|
CREATE FUNCTION temp_stable_func() RETURNS integer AS 'SELECT 10;' LANGUAGE SQL STABLE;
|
||||||
|
|
||||||
-- updates referencing non-IMMUTABLE functions are unsupported
|
-- updates referencing non-IMMUTABLE functions are unsupported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL!'' WHERE t_key = temp_stable_func()');
|
||||||
|
|
||||||
|
@ -147,10 +145,13 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = abs(-78) WHERE t_key = 10');
|
||||||
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
SELECT t_value FROM multi_shard_modify_test WHERE t_key=10;
|
||||||
|
|
||||||
-- updates referencing STABLE functions in SET section are not supported
|
-- updates referencing STABLE functions in SET section are supported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = temp_stable_func() * 2 WHERE t_key = 10');
|
||||||
|
|
||||||
-- updates referencing VOLATILE functions in SET section are not supported
|
-- updates referencing VOLATILE functions in SET section are not supported
|
||||||
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
|
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_value = random() WHERE t_key = 10');
|
||||||
|
|
||||||
|
-- commands with stable functions in their quals are allowed
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = temp_stable_func()');
|
||||||
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 102046;
|
||||||
|
|
Loading…
Reference in New Issue