Enable evaluation of stable functions in INSERT..SELECT

pull/1938/head
Marco Slot 2016-12-02 16:42:22 +01:00
parent f058ba3ec0
commit 9cdea04466
9 changed files with 292 additions and 88 deletions

View File

@ -31,6 +31,7 @@
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
@ -38,6 +39,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
@ -382,7 +384,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* concurrently.
*/
LockRelationShardListResources(task->relationShardList, ExclusiveLock);
LockRelationShardResources(task->relationShardList, ExclusiveLock);
}
}
@ -462,7 +464,7 @@ AcquireExecutorMultiShardLocks(List *taskList)
* concurrently.
*/
LockRelationShardListResources(task->relationShardList, ExclusiveLock);
LockRelationShardResources(task->relationShardList, ExclusiveLock);
}
}
}
@ -621,27 +623,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
if (requiresMasterEvaluation)
{
ListCell *taskCell = NULL;
Query *query = workerJob->jobQuery;
Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid;
Query *jobQuery = workerJob->jobQuery;
ExecuteMasterEvaluableFunctions(query);
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
StringInfo newQueryString = makeStringInfo();
deparse_shard_query(query, relationId, task->anchorShardId,
newQueryString);
ereport(DEBUG4, (errmsg("query before master evaluation: %s",
task->queryString)));
ereport(DEBUG4, (errmsg("query after master evaluation: %s",
newQueryString->data)));
task->queryString = newQueryString->data;
}
ExecuteMasterEvaluableFunctions(jobQuery);
RebuildQueryStrings(jobQuery, taskList);
}
if (list_length(taskList) == 1)

View File

@ -1451,42 +1451,6 @@ FindNodesOfType(MultiNode *node, int type)
}
/*
* IdentifyRTE assigns an identifier to an RTE, for tracking purposes.
*
* To be able to track RTEs through postgres' query planning, which copies and
* duplicate, and modifies them, we sometimes need to figure out whether two
* RTEs are copies of the same original RTE. For that we, hackishly, use a
* field normally unused in RTE_RELATION RTEs.
*
* The assigned identifier better be unique within a plantree.
*/
void
IdentifyRTE(RangeTblEntry *rte, int identifier)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(rte->values_lists == NIL);
rte->values_lists = list_make1_int(identifier);
}
/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1);
if (rte->values_lists == NULL)
{
return 0;
}
return linitial_int(rte->values_lists);
}
/*
* NeedsDistributedPlanning checks if the passed in query is a query running
* on a distributed table. If it is, we start distributed planning.
@ -1501,7 +1465,6 @@ NeedsDistributedPlanning(Query *queryTree)
ListCell *rangeTableCell = NULL;
bool hasLocalRelation = false;
bool hasDistributedRelation = false;
int rteIdentifier = 1;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
@ -1522,17 +1485,6 @@ NeedsDistributedPlanning(Query *queryTree)
if (IsDistributedTable(relationId))
{
hasDistributedRelation = true;
/*
* To be able to track individual RTEs through postgres' query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*/
if (rangeTableEntry->rtekind == RTE_RELATION)
{
IdentifyRTE(rangeTableEntry, rteIdentifier++);
}
}
else
{

View File

@ -289,8 +289,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
workerJob->dependedJobList = NIL;
workerJob->jobId = jobId;
workerJob->jobQuery = originalQuery;
/* for now we do not support any function evaluation */
workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
/* and finally the multi plan */

View File

@ -9,6 +9,7 @@
#include "postgres.h"
#include "distributed/citus_clauses.h"
#include "distributed/multi_router_planner.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
@ -35,6 +36,8 @@ bool
RequiresMasterEvaluation(Query *query)
{
ListCell *targetEntryCell = NULL;
ListCell *rteCell = NULL;
ListCell *cteCell = NULL;
foreach(targetEntryCell, query->targetList)
{
@ -46,6 +49,31 @@ RequiresMasterEvaluation(Query *query)
}
}
foreach(rteCell, query->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
if (rte->rtekind != RTE_SUBQUERY)
{
continue;
}
if (RequiresMasterEvaluation(rte->subquery))
{
return true;
}
}
foreach(cteCell, query->cteList)
{
CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell);
if (RequiresMasterEvaluation((Query *) expr->ctequery))
{
return true;
}
}
if (query->jointree && query->jointree->quals)
{
return contain_mutable_functions((Node *) query->jointree->quals);
@ -64,7 +92,10 @@ ExecuteMasterEvaluableFunctions(Query *query)
{
CmdType commandType = query->commandType;
ListCell *targetEntryCell = NULL;
ListCell *rteCell = NULL;
ListCell *cteCell = NULL;
Node *modifiedNode = NULL;
bool insertSelectQuery = InsertSelectQuery(query);
if (query->jointree && query->jointree->quals)
{
@ -81,7 +112,7 @@ ExecuteMasterEvaluableFunctions(Query *query)
continue;
}
if (commandType == CMD_INSERT)
if (commandType == CMD_INSERT && !insertSelectQuery)
{
modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr);
}
@ -93,11 +124,24 @@ ExecuteMasterEvaluableFunctions(Query *query)
targetEntry->expr = (Expr *) modifiedNode;
}
if (query->jointree)
foreach(rteCell, query->rtable)
{
Assert(!contain_mutable_functions((Node *) (query->jointree->quals)));
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
if (rte->rtekind != RTE_SUBQUERY)
{
continue;
}
ExecuteMasterEvaluableFunctions(rte->subquery);
}
foreach(cteCell, query->cteList)
{
CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell);
ExecuteMasterEvaluableFunctions((Query *) expr->ctequery);
}
Assert(!contain_mutable_functions((Node *) (query->targetList)));
}

View File

@ -305,11 +305,11 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode)
/*
* LockRelationShards takes locks on all shards in a list of RelationShards
* LockRelationShardResources takes locks on all shards in a list of RelationShards
* to prevent concurrent DML statements on those shards.
*/
void
LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode)
LockRelationShardResources(List *relationShardList, LOCKMODE lockMode)
{
ListCell *relationShardCell = NULL;

View File

@ -182,8 +182,6 @@ extern bool SubqueryPushdown;
/* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree);
extern bool NeedsDistributedPlanning(Query *queryTree);
extern int GetRTEIdentity(RangeTblEntry *rte);
extern void IdentifyRTE(RangeTblEntry *rte, int identifier);
extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode);

View File

@ -79,7 +79,7 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
/* Lock multiple shards for safe modification */
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
extern void LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode);
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
extern void LockMetadataSnapshot(LOCKMODE lockMode);

View File

@ -107,6 +107,86 @@ INSERT INTO raw_events_second SELECT * FROM raw_events_first;
ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300004"
DETAIL: Key (user_id, value_1)=(1, 10) already exists.
CONTEXT: while executing command on localhost:57637
-- stable functions should be allowed
INSERT INTO raw_events_second (user_id, time)
SELECT
user_id, now()
FROM
raw_events_first
WHERE
user_id < 0;
INSERT INTO raw_events_second (user_id)
SELECT
user_id
FROM
raw_events_first
WHERE
time > now() + interval '1 day';
-- hide version-dependent PL/pgSQL context messages
\set VERBOSITY terse
-- make sure we evaluate stable functions on the master, once
CREATE OR REPLACE FUNCTION evaluate_on_master()
RETURNS int LANGUAGE plpgsql STABLE
AS $function$
BEGIN
RAISE NOTICE 'evaluating on master';
RETURN 0;
END;
$function$;
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, evaluate_on_master()
FROM
raw_events_first
WHERE
user_id < 0;
NOTICE: evaluating on master
-- make sure stable functions in CTEs are evaluated
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT evaluate_on_master())
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first
WHERE
user_id < 0;
NOTICE: evaluating on master
-- make sure we don't evaluate stable functions with column arguments
CREATE OR REPLACE FUNCTION evaluate_on_master(x int)
RETURNS int LANGUAGE plpgsql STABLE
AS $function$
BEGIN
RAISE NOTICE 'evaluating on master';
RETURN x;
END;
$function$;
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, evaluate_on_master(value_1)
FROM
raw_events_first
WHERE
user_id = 0;
WARNING: function public.evaluate_on_master(integer) does not exist
WARNING: function public.evaluate_on_master(integer) does not exist
ERROR: could not modify any active placements
\set VERBOSITY default
-- volatile functions should be disallowed
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, (random()*10)::int
FROM
raw_events_first;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT (random()*10)::int)
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
@ -1861,12 +1941,44 @@ DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
DEBUG: CommitTransaction
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
-- set back to the default
SET citus.shard_count TO DEFAULT;
RESET client_min_messages;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
DEBUG: CommitTransaction
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
-- Stable function in default should be allowed
ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now();
INSERT INTO table_with_defaults (store_id, first_name, last_name)
SELECT
store_id, 'first '||store_id, 'last '||store_id
FROM
table_with_defaults
GROUP BY
store_id, first_name, last_name;
-- Volatile function in default should be disallowed
CREATE TABLE table_with_serial (
store_id int,
s bigserial
);
SELECT create_distributed_table('table_with_serial', 'store_id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO table_with_serial (store_id)
SELECT
store_id
FROM
table_with_defaults
GROUP BY
store_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries
DROP TABLE raw_events_first CASCADE;
NOTICE: drop cascades to view test_view
DROP TABLE raw_events_second;
DROP TABLE reference_table;
DROP TABLE agg_events;
DROP TABLE table_with_defaults;
DROP TABLE table_with_serial;

View File

@ -56,6 +56,89 @@ WHERE
-- see that we get unique vialitons
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
-- stable functions should be allowed
INSERT INTO raw_events_second (user_id, time)
SELECT
user_id, now()
FROM
raw_events_first
WHERE
user_id < 0;
INSERT INTO raw_events_second (user_id)
SELECT
user_id
FROM
raw_events_first
WHERE
time > now() + interval '1 day';
-- hide version-dependent PL/pgSQL context messages
\set VERBOSITY terse
-- make sure we evaluate stable functions on the master, once
CREATE OR REPLACE FUNCTION evaluate_on_master()
RETURNS int LANGUAGE plpgsql STABLE
AS $function$
BEGIN
RAISE NOTICE 'evaluating on master';
RETURN 0;
END;
$function$;
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, evaluate_on_master()
FROM
raw_events_first
WHERE
user_id < 0;
-- make sure stable functions in CTEs are evaluated
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT evaluate_on_master())
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first
WHERE
user_id < 0;
-- make sure we don't evaluate stable functions with column arguments
CREATE OR REPLACE FUNCTION evaluate_on_master(x int)
RETURNS int LANGUAGE plpgsql STABLE
AS $function$
BEGIN
RAISE NOTICE 'evaluating on master';
RETURN x;
END;
$function$;
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, evaluate_on_master(value_1)
FROM
raw_events_first
WHERE
user_id = 0;
\set VERBOSITY default
-- volatile functions should be disallowed
INSERT INTO raw_events_second (user_id, value_1)
SELECT
user_id, (random()*10)::int
FROM
raw_events_first;
INSERT INTO raw_events_second (user_id, value_1)
WITH sub_cte AS (SELECT (random()*10)::int)
SELECT
user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
@ -824,5 +907,37 @@ FROM
GROUP BY
last_name, store_id, first_name, default_2;
-- set back to the default
SET citus.shard_count TO DEFAULT;
RESET client_min_messages;
-- Stable function in default should be allowed
ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now();
INSERT INTO table_with_defaults (store_id, first_name, last_name)
SELECT
store_id, 'first '||store_id, 'last '||store_id
FROM
table_with_defaults
GROUP BY
store_id, first_name, last_name;
-- Volatile function in default should be disallowed
CREATE TABLE table_with_serial (
store_id int,
s bigserial
);
SELECT create_distributed_table('table_with_serial', 'store_id');
INSERT INTO table_with_serial (store_id)
SELECT
store_id
FROM
table_with_defaults
GROUP BY
store_id;
DROP TABLE raw_events_first CASCADE;
DROP TABLE raw_events_second;
DROP TABLE reference_table;
DROP TABLE agg_events;
DROP TABLE table_with_defaults;
DROP TABLE table_with_serial;