diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 932bf6537..9d3f99541 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -31,6 +31,7 @@ #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -38,6 +39,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" @@ -382,7 +384,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * concurrently. */ - LockRelationShardListResources(task->relationShardList, ExclusiveLock); + LockRelationShardResources(task->relationShardList, ExclusiveLock); } } @@ -462,7 +464,7 @@ AcquireExecutorMultiShardLocks(List *taskList) * concurrently. */ - LockRelationShardListResources(task->relationShardList, ExclusiveLock); + LockRelationShardResources(task->relationShardList, ExclusiveLock); } } } @@ -621,27 +623,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (requiresMasterEvaluation) { - ListCell *taskCell = NULL; - Query *query = workerJob->jobQuery; - Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid; + Query *jobQuery = workerJob->jobQuery; - ExecuteMasterEvaluableFunctions(query); - - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - StringInfo newQueryString = makeStringInfo(); - - deparse_shard_query(query, relationId, task->anchorShardId, - newQueryString); - - ereport(DEBUG4, (errmsg("query before master evaluation: %s", - task->queryString))); - ereport(DEBUG4, (errmsg("query after master evaluation: %s", - newQueryString->data))); - - task->queryString = newQueryString->data; - } + ExecuteMasterEvaluableFunctions(jobQuery); + RebuildQueryStrings(jobQuery, taskList); } if (list_length(taskList) == 1) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 86545479a..e67a68b3f 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1451,42 +1451,6 @@ FindNodesOfType(MultiNode *node, int type) } -/* - * IdentifyRTE assigns an identifier to an RTE, for tracking purposes. - * - * To be able to track RTEs through postgres' query planning, which copies and - * duplicate, and modifies them, we sometimes need to figure out whether two - * RTEs are copies of the same original RTE. For that we, hackishly, use a - * field normally unused in RTE_RELATION RTEs. - * - * The assigned identifier better be unique within a plantree. - */ -void -IdentifyRTE(RangeTblEntry *rte, int identifier) -{ - Assert(rte->rtekind == RTE_RELATION); - Assert(rte->values_lists == NIL); - rte->values_lists = list_make1_int(identifier); -} - - -/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */ -int -GetRTEIdentity(RangeTblEntry *rte) -{ - Assert(rte->rtekind == RTE_RELATION); - Assert(IsA(rte->values_lists, IntList)); - Assert(list_length(rte->values_lists) == 1); - - if (rte->values_lists == NULL) - { - return 0; - } - - return linitial_int(rte->values_lists); -} - - /* * NeedsDistributedPlanning checks if the passed in query is a query running * on a distributed table. If it is, we start distributed planning. @@ -1501,7 +1465,6 @@ NeedsDistributedPlanning(Query *queryTree) ListCell *rangeTableCell = NULL; bool hasLocalRelation = false; bool hasDistributedRelation = false; - int rteIdentifier = 1; if (commandType != CMD_SELECT && commandType != CMD_INSERT && commandType != CMD_UPDATE && commandType != CMD_DELETE) @@ -1522,17 +1485,6 @@ NeedsDistributedPlanning(Query *queryTree) if (IsDistributedTable(relationId)) { hasDistributedRelation = true; - - /* - * To be able to track individual RTEs through postgres' query - * planning, we need to be able to figure out whether an RTE is - * actually a copy of another, rather than a different one. We - * simply number the RTEs starting from 1. - */ - if (rangeTableEntry->rtekind == RTE_RELATION) - { - IdentifyRTE(rangeTableEntry, rteIdentifier++); - } } else { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9bfdc1912..b77eada52 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -289,8 +289,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery, workerJob->dependedJobList = NIL; workerJob->jobId = jobId; workerJob->jobQuery = originalQuery; - - /* for now we do not support any function evaluation */ workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* and finally the multi plan */ diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 521ad3aa7..f0d632e5b 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -9,6 +9,7 @@ #include "postgres.h" #include "distributed/citus_clauses.h" +#include "distributed/multi_router_planner.h" #include "catalog/pg_type.h" #include "executor/executor.h" @@ -35,6 +36,8 @@ bool RequiresMasterEvaluation(Query *query) { ListCell *targetEntryCell = NULL; + ListCell *rteCell = NULL; + ListCell *cteCell = NULL; foreach(targetEntryCell, query->targetList) { @@ -46,6 +49,31 @@ RequiresMasterEvaluation(Query *query) } } + foreach(rteCell, query->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); + + if (rte->rtekind != RTE_SUBQUERY) + { + continue; + } + + if (RequiresMasterEvaluation(rte->subquery)) + { + return true; + } + } + + foreach(cteCell, query->cteList) + { + CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell); + + if (RequiresMasterEvaluation((Query *) expr->ctequery)) + { + return true; + } + } + if (query->jointree && query->jointree->quals) { return contain_mutable_functions((Node *) query->jointree->quals); @@ -64,7 +92,10 @@ ExecuteMasterEvaluableFunctions(Query *query) { CmdType commandType = query->commandType; ListCell *targetEntryCell = NULL; + ListCell *rteCell = NULL; + ListCell *cteCell = NULL; Node *modifiedNode = NULL; + bool insertSelectQuery = InsertSelectQuery(query); if (query->jointree && query->jointree->quals) { @@ -81,7 +112,7 @@ ExecuteMasterEvaluableFunctions(Query *query) continue; } - if (commandType == CMD_INSERT) + if (commandType == CMD_INSERT && !insertSelectQuery) { modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr); } @@ -93,11 +124,24 @@ ExecuteMasterEvaluableFunctions(Query *query) targetEntry->expr = (Expr *) modifiedNode; } - if (query->jointree) + foreach(rteCell, query->rtable) { - Assert(!contain_mutable_functions((Node *) (query->jointree->quals))); + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); + + if (rte->rtekind != RTE_SUBQUERY) + { + continue; + } + + ExecuteMasterEvaluableFunctions(rte->subquery); + } + + foreach(cteCell, query->cteList) + { + CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell); + + ExecuteMasterEvaluableFunctions((Query *) expr->ctequery); } - Assert(!contain_mutable_functions((Node *) (query->targetList))); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 127b9aca9..766bc166a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -305,11 +305,11 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) /* - * LockRelationShards takes locks on all shards in a list of RelationShards + * LockRelationShardResources takes locks on all shards in a list of RelationShards * to prevent concurrent DML statements on those shards. */ void -LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode) +LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) { ListCell *relationShardCell = NULL; diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 909a88339..6487c7a28 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -182,8 +182,6 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree); -extern int GetRTEIdentity(RangeTblEntry *rte); -extern void IdentifyRTE(RangeTblEntry *rte, int identifier); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a1e224f9e..67d02a037 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -79,7 +79,7 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); /* Lock multiple shards for safe modification */ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); -extern void LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode); +extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); extern void LockMetadataSnapshot(LOCKMODE lockMode); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 64422f174..c08a0c526 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -107,6 +107,86 @@ INSERT INTO raw_events_second SELECT * FROM raw_events_first; ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300004" DETAIL: Key (user_id, value_1)=(1, 10) already exists. CONTEXT: while executing command on localhost:57637 +-- stable functions should be allowed +INSERT INTO raw_events_second (user_id, time) +SELECT + user_id, now() +FROM + raw_events_first +WHERE + user_id < 0; +INSERT INTO raw_events_second (user_id) +SELECT + user_id +FROM + raw_events_first +WHERE + time > now() + interval '1 day'; +-- hide version-dependent PL/pgSQL context messages +\set VERBOSITY terse +-- make sure we evaluate stable functions on the master, once +CREATE OR REPLACE FUNCTION evaluate_on_master() +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN 0; +END; +$function$; +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master() +FROM + raw_events_first +WHERE + user_id < 0; +NOTICE: evaluating on master +-- make sure stable functions in CTEs are evaluated +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT evaluate_on_master()) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first +WHERE + user_id < 0; +NOTICE: evaluating on master +-- make sure we don't evaluate stable functions with column arguments +CREATE OR REPLACE FUNCTION evaluate_on_master(x int) +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN x; +END; +$function$; +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master(value_1) +FROM + raw_events_first +WHERE + user_id = 0; +WARNING: function public.evaluate_on_master(integer) does not exist +WARNING: function public.evaluate_on_master(integer) does not exist +ERROR: could not modify any active placements +\set VERBOSITY default +-- volatile functions should be disallowed +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, (random()*10)::int +FROM + raw_events_first; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT (random()*10)::int) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -1861,12 +1941,44 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: --- set back to the default -SET citus.shard_count TO DEFAULT; +RESET client_min_messages; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: ProcessUtility -DEBUG: CommitTransactionCommand -DEBUG: CommitTransaction -DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +-- Stable function in default should be allowed +ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now(); +INSERT INTO table_with_defaults (store_id, first_name, last_name) +SELECT + store_id, 'first '||store_id, 'last '||store_id +FROM + table_with_defaults +GROUP BY + store_id, first_name, last_name; +-- Volatile function in default should be disallowed +CREATE TABLE table_with_serial ( + store_id int, + s bigserial +); +SELECT create_distributed_table('table_with_serial', 'store_id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO table_with_serial (store_id) +SELECT + store_id +FROM + table_with_defaults +GROUP BY + store_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +DROP TABLE raw_events_first CASCADE; +NOTICE: drop cascades to view test_view +DROP TABLE raw_events_second; +DROP TABLE reference_table; +DROP TABLE agg_events; +DROP TABLE table_with_defaults; +DROP TABLE table_with_serial; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index c327d829a..db1e0b7bf 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -56,6 +56,89 @@ WHERE -- see that we get unique vialitons INSERT INTO raw_events_second SELECT * FROM raw_events_first; +-- stable functions should be allowed +INSERT INTO raw_events_second (user_id, time) +SELECT + user_id, now() +FROM + raw_events_first +WHERE + user_id < 0; + +INSERT INTO raw_events_second (user_id) +SELECT + user_id +FROM + raw_events_first +WHERE + time > now() + interval '1 day'; + +-- hide version-dependent PL/pgSQL context messages +\set VERBOSITY terse + +-- make sure we evaluate stable functions on the master, once +CREATE OR REPLACE FUNCTION evaluate_on_master() +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN 0; +END; +$function$; + +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master() +FROM + raw_events_first +WHERE + user_id < 0; + +-- make sure stable functions in CTEs are evaluated +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT evaluate_on_master()) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first +WHERE + user_id < 0; + +-- make sure we don't evaluate stable functions with column arguments +CREATE OR REPLACE FUNCTION evaluate_on_master(x int) +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN x; +END; +$function$; + +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master(value_1) +FROM + raw_events_first +WHERE + user_id = 0; + +\set VERBOSITY default + +-- volatile functions should be disallowed +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, (random()*10)::int +FROM + raw_events_first; + +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT (random()*10)::int) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first; + + -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -824,5 +907,37 @@ FROM GROUP BY last_name, store_id, first_name, default_2; --- set back to the default -SET citus.shard_count TO DEFAULT; +RESET client_min_messages; + +-- Stable function in default should be allowed +ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now(); + +INSERT INTO table_with_defaults (store_id, first_name, last_name) +SELECT + store_id, 'first '||store_id, 'last '||store_id +FROM + table_with_defaults +GROUP BY + store_id, first_name, last_name; + +-- Volatile function in default should be disallowed +CREATE TABLE table_with_serial ( + store_id int, + s bigserial +); +SELECT create_distributed_table('table_with_serial', 'store_id'); + +INSERT INTO table_with_serial (store_id) +SELECT + store_id +FROM + table_with_defaults +GROUP BY + store_id; + +DROP TABLE raw_events_first CASCADE; +DROP TABLE raw_events_second; +DROP TABLE reference_table; +DROP TABLE agg_events; +DROP TABLE table_with_defaults; +DROP TABLE table_with_serial;