From d745d7bf70cbf12318b84d4971ddab22224f2c84 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 2 Dec 2016 12:12:47 +0100 Subject: [PATCH 1/2] Add explicit RelationShards mapping to tasks --- .../executor/multi_router_executor.c | 4 +- .../distributed/planner/deparse_shard_query.c | 238 +++++++++++++++++ .../planner/multi_logical_planner.c | 5 + .../planner/multi_router_planner.c | 250 +++++------------- .../distributed/utils/citus_nodefuncs.c | 5 +- .../distributed/utils/citus_outfuncs.c | 20 +- .../distributed/utils/citus_readfuncs.c | 15 +- .../distributed/utils/citus_readfuncs_95.c | 2 + src/backend/distributed/utils/resource_lock.c | 26 ++ .../distributed/utils/shardinterval_utils.c | 38 +++ src/include/distributed/citus_nodefuncs.h | 2 + src/include/distributed/citus_nodes.h | 3 +- src/include/distributed/deparse_shard_query.h | 27 ++ .../distributed/multi_physical_planner.h | 2 +- src/include/distributed/multi_planner.h | 9 + .../distributed/multi_router_planner.h | 5 + src/include/distributed/resource_lock.h | 1 + src/include/distributed/shardinterval_utils.h | 2 + .../regress/expected/multi_router_planner.out | 13 +- src/test/regress/sql/multi_router_planner.sql | 3 +- 20 files changed, 470 insertions(+), 200 deletions(-) create mode 100644 src/backend/distributed/planner/deparse_shard_query.c create mode 100644 src/include/distributed/deparse_shard_query.h diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ebb99c26c..932bf6537 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -382,7 +382,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * concurrently. */ - LockShardListResources(task->selectShardList, ExclusiveLock); + LockRelationShardListResources(task->relationShardList, ExclusiveLock); } } @@ -462,7 +462,7 @@ AcquireExecutorMultiShardLocks(List *taskList) * concurrently. */ - LockShardListResources(task->selectShardList, ExclusiveLock); + LockRelationShardListResources(task->relationShardList, ExclusiveLock); } } } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c new file mode 100644 index 000000000..90e070601 --- /dev/null +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -0,0 +1,238 @@ +/*------------------------------------------------------------------------- + * + * deparse_shard_query.c + * + * This file contains functions for deparsing shard queries. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" + +#include "access/heapam.h" +#include "distributed/citus_nodefuncs.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/deparse_shard_query.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" +#include "lib/stringinfo.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "storage/lock.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + + +static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); + + +/* + * RebuildQueryStrings deparses the job query for each task to + * include execution-time changes such as function evaluation. + */ +void +RebuildQueryStrings(Query *originalQuery, List *taskList) +{ + ListCell *taskCell = NULL; + Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid; + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + StringInfo newQueryString = makeStringInfo(); + Query *query = originalQuery; + + if (task->insertSelectQuery) + { + /* for INSERT..SELECT, adjust shard names in SELECT part */ + RangeTblEntry *copiedInsertRte = NULL; + RangeTblEntry *copiedSubqueryRte = NULL; + Query *copiedSubquery = NULL; + List *relationShardList = task->relationShardList; + ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); + + query = copyObject(originalQuery); + + copiedInsertRte = ExtractInsertRangeTableEntry(query); + copiedSubqueryRte = ExtractSelectRangeTableEntry(query); + copiedSubquery = copiedSubqueryRte->subquery; + + AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); + ReorderInsertSelectTargetLists(query, copiedInsertRte, copiedSubqueryRte); + + /* setting an alias simplifies deparsing of RETURNING */ + if (copiedInsertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + copiedInsertRte->alias = alias; + } + + UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); + } + + deparse_shard_query(query, relationId, task->anchorShardId, + newQueryString); + + ereport(DEBUG4, (errmsg("query before rebuilding: %s", + task->queryString))); + ereport(DEBUG4, (errmsg("query after rebuilding: %s", + newQueryString->data))); + + task->queryString = newQueryString->data; + } +} + + +/* + * UpdateRelationToShardNames walks over the query tree and appends shard ids to + * relations. It uses unique identity value to establish connection between a + * shard and the range table entry. If the range table id is not given a + * identity, than the relation is not referenced from the query, no connection + * could be found between a shard and this relation. Therefore relation is replaced + * by set of NULL values so that the query would work at worker without any problems. + * + */ +bool +UpdateRelationToShardNames(Node *node, List *relationShardList) +{ + RangeTblEntry *newRte = NULL; + uint64 shardId = INVALID_SHARD_ID; + Oid relationId = InvalidOid; + Oid schemaId = InvalidOid; + char *relationName = NULL; + char *schemaName = NULL; + bool replaceRteWithNullValues = false; + ListCell *relationShardCell = NULL; + RelationShard *relationShard = NULL; + + if (node == NULL) + { + return false; + } + + /* want to look at all RTEs, even in subqueries, CTEs and such */ + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, UpdateRelationToShardNames, + relationShardList, QTW_EXAMINE_RTES); + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, UpdateRelationToShardNames, + relationShardList); + } + + newRte = (RangeTblEntry *) node; + + if (newRte->rtekind != RTE_RELATION) + { + return false; + } + + /* + * Search for the restrictions associated with the RTE. There better be + * some, otherwise this query wouldn't be elegible as a router query. + * + * FIXME: We should probably use a hashtable here, to do efficient + * lookup. + */ + foreach(relationShardCell, relationShardList) + { + relationShard = (RelationShard *) lfirst(relationShardCell); + + if (newRte->relid == relationShard->relationId) + { + break; + } + + relationShard = NULL; + } + + replaceRteWithNullValues = relationShard == NULL || + relationShard->shardId == INVALID_SHARD_ID; + if (replaceRteWithNullValues) + { + ConvertRteToSubqueryWithEmptyResult(newRte); + return false; + } + + shardId = relationShard->shardId; + relationId = relationShard->relationId; + + relationName = get_rel_name(relationId); + AppendShardIdToName(&relationName, shardId); + + schemaId = get_rel_namespace(relationId); + schemaName = get_namespace_name(schemaId); + + ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL); + + return false; +} + + +/* + * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into + * subquery RTE that returns no results. + */ +static void +ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte) +{ + Relation relation = heap_open(rte->relid, NoLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + int columnCount = tupleDescriptor->natts; + int columnIndex = 0; + Query *subquery = NULL; + List *targetList = NIL; + FromExpr *joinTree = NULL; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex]; + TargetEntry *targetEntry = NULL; + StringInfo resname = NULL; + Const *constValue = NULL; + + if (attributeForm->attisdropped) + { + continue; + } + + resname = makeStringInfo(); + constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod, + attributeForm->attcollation); + + appendStringInfo(resname, "%s", attributeForm->attname.data); + + targetEntry = makeNode(TargetEntry); + targetEntry->expr = (Expr *) constValue; + targetEntry->resno = columnIndex; + targetEntry->resname = resname->data; + + targetList = lappend(targetList, targetEntry); + } + + heap_close(relation, NoLock); + + joinTree = makeNode(FromExpr); + joinTree->quals = makeBoolConst(false, false); + + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_ORIGINAL; + subquery->canSetTag = true; + subquery->targetList = targetList; + subquery->jointree = joinTree; + + rte->rtekind = RTE_SUBQUERY; + rte->subquery = subquery; + rte->alias = copyObject(rte->eref); +} diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index c49d6814a..86545479a 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1478,6 +1478,11 @@ GetRTEIdentity(RangeTblEntry *rte) Assert(IsA(rte->values_lists, IntList)); Assert(list_length(rte->values_lists) == 1); + if (rte->values_lists == NULL) + { + return 0; + } + return linitial_int(rte->values_lists); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index cead4c175..9bfdc1912 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -23,6 +23,7 @@ #include "distributed/colocation_utils.h" #include "distributed/citus_nodes.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/deparse_shard_query.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" @@ -82,10 +83,6 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, RelationRestrictionContext * restrictionContext, uint32 taskIdIndex); -static void AddShardIntervalRestrictionToSelect(Query *subqery, - ShardInterval *shardInterval); -static RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); -static RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -105,13 +102,12 @@ static Task * RouterSelectTask(Query *originalQuery, static bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, List **placementList, uint64 *anchorShardId, - List **selectShardList, bool replacePrunedQueryWithDummy); + List **relationShardList, bool replacePrunedQueryWithDummy); +static bool RelationPrunesToMultipleShards(List *relationShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); -static bool UpdateRelationNames(Node *node, - RelationRestrictionContext *restrictionContext); static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); @@ -295,7 +291,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, workerJob->jobQuery = originalQuery; /* for now we do not support any function evaluation */ - workerJob->requiresMasterEvaluation = false; + workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* and finally the multi plan */ multiPlan = CitusMakeNode(MultiPlan); @@ -339,7 +335,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter Task *modifyTask = NULL; List *selectPlacementList = NIL; uint64 selectAnchorShardId = INVALID_SHARD_ID; - List *selectShardList = NIL; + List *relationShardList = NIL; uint64 jobId = INVALID_JOB_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; @@ -398,7 +394,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter */ routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext, &selectPlacementList, &selectAnchorShardId, - &selectShardList, replacePrunedQueryWithDummy); + &relationShardList, replacePrunedQueryWithDummy); if (!routerPlannable) { @@ -463,7 +459,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->upsertQuery = upsertQuery; - modifyTask->selectShardList = selectShardList; + modifyTask->relationShardList = relationShardList; return modifyTask; } @@ -479,7 +475,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter * The function expects and asserts that subquery's target list contains a partition * column value. Thus, this function should never be called with reference tables. */ -static void +void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval) { List *targetList = subqery->targetList; @@ -594,7 +590,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval * Note that the function expects and asserts that the input query be * an INSERT...SELECT query. */ -static RangeTblEntry * +RangeTblEntry * ExtractSelectRangeTableEntry(Query *query) { List *fromList = NULL; @@ -617,7 +613,7 @@ ExtractSelectRangeTableEntry(Query *query) * Note that the function expects and asserts that the input query be * an INSERT...SELECT query. */ -static RangeTblEntry * +RangeTblEntry * ExtractInsertRangeTableEntry(Query *query) { int resultRelation = query->resultRelation; @@ -650,13 +646,13 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, subquery = subqueryRte->subquery; - if (contain_mutable_functions((Node *) queryTree)) + if (contain_volatile_functions((Node *) queryTree)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning for the given " "modification"), errdetail( - "Stable and volatile functions are not allowed in INSERT ... " + "Volatile functions are not allowed in INSERT ... " "SELECT queries"))); } @@ -1917,14 +1913,14 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo StringInfo queryString = makeStringInfo(); bool upsertQuery = false; uint64 shardId = INVALID_SHARD_ID; - List *selectShardList = NIL; + List *relationShardList = NIL; bool replacePrunedQueryWithDummy = false; /* router planner should create task even if it deosn't hit a shard at all */ replacePrunedQueryWithDummy = true; queryRoutable = RouterSelectQuery(originalQuery, restrictionContext, - placementList, &shardId, &selectShardList, + placementList, &shardId, &relationShardList, replacePrunedQueryWithDummy); @@ -1943,6 +1939,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; + task->relationShardList = relationShardList; return task; } @@ -1954,12 +1951,14 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo * * On return true, all RTEs have been updated to point to the relevant shards in * the originalQuery. Also, placementList is filled with the list of worker nodes - * that has all the required shard placements for the query execution. Finally, - * anchorShardId is set to the first pruned shardId of the given query. + * that has all the required shard placements for the query execution. + * anchorShardId is set to the first pruned shardId of the given query. Finally, + * relationShardList is filled with the list of relation-to-shard mappings for + * the query. */ static bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId, List **selectShardList, + List **placementList, uint64 *anchorShardId, List **relationShardList, bool replacePrunedQueryWithDummy) { List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery, @@ -1982,7 +1981,9 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC foreach(prunedRelationShardListCell, prunedRelationShardList) { List *prunedShardList = (List *) lfirst(prunedRelationShardListCell); + ShardInterval *shardInterval = NULL; + RelationShard *relationShard = NULL; /* no shard is present or all shards are pruned out case will be handled later */ if (prunedShardList == NIL) @@ -2003,7 +2004,21 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC shardId = shardInterval->shardId; } - *selectShardList = lappend(*selectShardList, shardInterval); + /* add relation to shard mapping */ + relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = shardInterval->relationId; + relationShard->shardId = shardInterval->shardId; + + *relationShardList = lappend(*relationShardList, relationShard); + } + + /* + * We bail out if there are RTEs that prune multiple shards above, but + * there can also be multiple RTEs that reference the same relation. + */ + if (RelationPrunesToMultipleShards(*relationShardList)) + { + return false; } /* @@ -2049,7 +2064,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC return false; } - UpdateRelationNames((Node *) originalQuery, restrictionContext); + UpdateRelationToShardNames((Node *) originalQuery, *relationShardList); *placementList = workerList; *anchorShardId = shardId; @@ -2137,6 +2152,37 @@ TargetShardIntervalsForSelect(Query *query, } +/* + * RelationPrunesToMultipleShards returns true if the given list of + * relation-to-shard mappings contains at least two mappings with + * the same relation, but different shards. + */ +static bool +RelationPrunesToMultipleShards(List *relationShardList) +{ + ListCell *relationShardCell = NULL; + RelationShard *previousRelationShard = NULL; + + relationShardList = SortList(relationShardList, CompareRelationShards); + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + + if (previousRelationShard != NULL && + relationShard->relationId == previousRelationShard->relationId && + relationShard->shardId != previousRelationShard->shardId) + { + return true; + } + + previousRelationShard = relationShard; + } + + return false; +} + + /* * WorkersContainingAllShards returns list of shard placements that contain all * shard intervals provided to the function. It returns NIL if no placement exists. @@ -2233,164 +2279,6 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList) } -/* - * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into - * subquery RTE that returns no results. - */ -static void -ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte) -{ - Relation relation = heap_open(rte->relid, NoLock); - TupleDesc tupleDescriptor = RelationGetDescr(relation); - int columnCount = tupleDescriptor->natts; - int columnIndex = 0; - Query *subquery = NULL; - List *targetList = NIL; - FromExpr *joinTree = NULL; - - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex]; - TargetEntry *targetEntry = NULL; - StringInfo resname = NULL; - Const *constValue = NULL; - - if (attributeForm->attisdropped) - { - continue; - } - - resname = makeStringInfo(); - constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod, - attributeForm->attcollation); - - appendStringInfo(resname, "%s", attributeForm->attname.data); - - targetEntry = makeNode(TargetEntry); - targetEntry->expr = (Expr *) constValue; - targetEntry->resno = columnIndex; - targetEntry->resname = resname->data; - - targetList = lappend(targetList, targetEntry); - } - - heap_close(relation, NoLock); - - joinTree = makeNode(FromExpr); - joinTree->quals = makeBoolConst(false, false); - - subquery = makeNode(Query); - subquery->commandType = CMD_SELECT; - subquery->querySource = QSRC_ORIGINAL; - subquery->canSetTag = true; - subquery->targetList = targetList; - subquery->jointree = joinTree; - - rte->rtekind = RTE_SUBQUERY; - rte->subquery = subquery; - rte->alias = copyObject(rte->eref); -} - - -/* - * UpdateRelationNames walks over the query tree and appends shard ids to - * relations. It uses unique identity value to establish connection between a - * shard and the range table entry. If the range table id is not given a - * identity, than the relation is not referenced from the query, no connection - * could be found between a shard and this relation. Therefore relation is replaced - * by set of NULL values so that the query would work at worker without any problems. - * - */ -static bool -UpdateRelationNames(Node *node, RelationRestrictionContext *restrictionContext) -{ - RangeTblEntry *newRte = NULL; - uint64 shardId = INVALID_SHARD_ID; - Oid relationId = InvalidOid; - Oid schemaId = InvalidOid; - char *relationName = NULL; - char *schemaName = NULL; - ListCell *relationRestrictionCell = NULL; - RelationRestriction *relationRestriction = NULL; - List *shardIntervalList = NIL; - ShardInterval *shardInterval = NULL; - bool replaceRteWithNullValues = false; - - if (node == NULL) - { - return false; - } - - /* want to look at all RTEs, even in subqueries, CTEs and such */ - if (IsA(node, Query)) - { - return query_tree_walker((Query *) node, UpdateRelationNames, restrictionContext, - QTW_EXAMINE_RTES); - } - - if (!IsA(node, RangeTblEntry)) - { - return expression_tree_walker(node, UpdateRelationNames, restrictionContext); - } - - - newRte = (RangeTblEntry *) node; - - if (newRte->rtekind != RTE_RELATION) - { - return false; - } - - /* - * Search for the restrictions associated with the RTE. There better be - * some, otherwise this query wouldn't be elegible as a router query. - * - * FIXME: We should probably use a hashtable here, to do efficient - * lookup. - */ - foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) - { - relationRestriction = - (RelationRestriction *) lfirst(relationRestrictionCell); - - if (GetRTEIdentity(relationRestriction->rte) == GetRTEIdentity(newRte)) - { - break; - } - - relationRestriction = NULL; - } - - replaceRteWithNullValues = (relationRestriction == NULL) || - relationRestriction->prunedShardIntervalList == NIL; - - if (replaceRteWithNullValues) - { - ConvertRteToSubqueryWithEmptyResult(newRte); - return false; - } - - Assert(relationRestriction != NULL); - - shardIntervalList = relationRestriction->prunedShardIntervalList; - - Assert(list_length(shardIntervalList) == 1); - shardInterval = (ShardInterval *) linitial(shardIntervalList); - - shardId = shardInterval->shardId; - relationId = shardInterval->relationId; - relationName = get_rel_name(relationId); - AppendShardIdToName(&relationName, shardId); - - schemaId = get_rel_namespace(relationId); - schemaName = get_namespace_name(schemaId); - - ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL); - - return false; -} - - /* * RouterQueryJob creates a Job for the specified query to execute the * provided single shard select task. diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 4846437ec..03dda8a7c 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -14,6 +14,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_nodefuncs.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_planner.h" static const char *CitusNodeTagNamesD[] = { "MultiNode", @@ -31,7 +32,8 @@ static const char *CitusNodeTagNamesD[] = { "MultiPlan", "Task", "ShardInterval", - "ShardPlacement" + "ShardPlacement", + "RelationShard" }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -379,6 +381,7 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(MapMergeJob), DEFINE_NODE_METHODS(ShardPlacement), + DEFINE_NODE_METHODS(RelationShard), DEFINE_NODE_METHODS(Task), /* nodes with only output support */ diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b4006db91..bf16c55f6 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -24,6 +24,7 @@ #include "distributed/citus_nodes.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/master_metadata_utility.h" #include "lib/stringinfo.h" #include "nodes/plannodes.h" @@ -475,6 +476,17 @@ OutShardPlacement(OUTFUNC_ARGS) } +void +OutRelationShard(OUTFUNC_ARGS) +{ + WRITE_LOCALS(RelationShard); + WRITE_NODE_TYPE("RELATIONSHARD"); + + WRITE_OID_FIELD(relationId); + WRITE_UINT64_FIELD(shardId); +} + + void OutTask(OUTFUNC_ARGS) { @@ -495,7 +507,7 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); WRITE_BOOL_FIELD(insertSelectQuery); - WRITE_NODE_FIELD(selectShardList); + WRITE_NODE_FIELD(relationShardList); } #if (PG_VERSION_NUM < 90600) @@ -612,6 +624,12 @@ outNode(StringInfo str, const void *obj) appendStringInfoChar(str, '}'); break; + case T_RelationShard: + appendStringInfoChar(str, '{'); + OutRelationShard(str, obj); + appendStringInfoChar(str, '}'); + break; + default: /* fall back into postgres' normal nodeToString machinery */ appendStringInfoString(str, nodeToString(obj)); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index c5ff9249b..ec9e9955c 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -14,6 +14,7 @@ #include #include "distributed/citus_nodefuncs.h" +#include "distributed/multi_planner.h" #include "nodes/parsenodes.h" #include "nodes/readfuncs.h" @@ -272,6 +273,18 @@ ReadShardPlacement(READFUNC_ARGS) } +READFUNC_RET +ReadRelationShard(READFUNC_ARGS) +{ + READ_LOCALS(RelationShard); + + READ_OID_FIELD(relationId); + READ_UINT64_FIELD(shardId); + + READ_DONE(); +} + + READFUNC_RET ReadTask(READFUNC_ARGS) { @@ -291,7 +304,7 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); READ_BOOL_FIELD(insertSelectQuery); - READ_NODE_FIELD(selectShardList); + READ_NODE_FIELD(relationShardList); READ_DONE(); } diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index 811074078..b5ac97576 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1515,6 +1515,8 @@ CitusParseNodeString(void) return_value = ReadMapMergeJob(); else if (MATCH("SHARDPLACEMENT", 14)) return_value = ReadShardPlacement(); + else if (MATCH("RELATIONSHARD", 13)) + return_value = ReadRelationShard(); else if (MATCH("TASK", 4)) return_value = ReadTask(); /* XXX: END Citus Nodes */ diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 0f5b75202..127b9aca9 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -21,6 +21,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_planner.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -303,6 +304,31 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) } +/* + * LockRelationShards takes locks on all shards in a list of RelationShards + * to prevent concurrent DML statements on those shards. + */ +void +LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode) +{ + ListCell *relationShardCell = NULL; + + /* lock shards in a consistent order to prevent deadlock */ + relationShardList = SortList(relationShardList, CompareRelationShards); + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + uint64 shardId = relationShard->shardId; + + if (shardId != INVALID_SHARD_ID) + { + LockShardResource(shardId, lockMode); + } + } +} + + /* * LockMetadataSnapshot acquires a lock needed to serialize changes to pg_dist_node * and all other metadata changes. Operations that modify pg_dist_node should acquire diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 587783d2a..a28749483 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -15,6 +15,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_planner.h" #include "distributed/shardinterval_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/worker_protocol.h" @@ -132,6 +133,43 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement) } +/* + * CompareRelationShards is a comparison function for sorting relation + * to shard mappings by their relation ID and then shard ID. + */ +int +CompareRelationShards(const void *leftElement, const void *rightElement) +{ + RelationShard *leftRelationShard = *((RelationShard **) leftElement); + RelationShard *rightRelationShard = *((RelationShard **) rightElement); + Oid leftRelationId = leftRelationShard->relationId; + Oid rightRelationId = rightRelationShard->relationId; + int64 leftShardId = leftRelationShard->shardId; + int64 rightShardId = rightRelationShard->shardId; + + if (leftRelationId > rightRelationId) + { + return 1; + } + else if (leftRelationId < rightRelationId) + { + return -1; + } + else if (leftShardId > rightShardId) + { + return 1; + } + else if (leftShardId < rightShardId) + { + return -1; + } + else + { + return 0; + } +} + + /* * FindShardIntervalIndex finds index of given shard in sorted shard interval array. * diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 7dc8a3653..8d5c782d4 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -66,6 +66,7 @@ extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); +extern READFUNC_RET ReadRelationShard(READFUNC_ARGS); extern READFUNC_RET ReadTask(READFUNC_ARGS); extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); @@ -75,6 +76,7 @@ extern void OutMultiPlan(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); +extern void OutRelationShard(OUTFUNC_ARGS); extern void OutTask(OUTFUNC_ARGS); extern void OutMultiNode(OUTFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index f81b19173..05e7e0bd1 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -55,7 +55,8 @@ typedef enum CitusNodeTag T_MultiPlan, T_Task, T_ShardInterval, - T_ShardPlacement + T_ShardPlacement, + T_RelationShard } CitusNodeTag; diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h new file mode 100644 index 000000000..2fe00edbf --- /dev/null +++ b/src/include/distributed/deparse_shard_query.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * deparse_shard_query.h + * + * Declarations for public functions and types related to deparsing shard + * queries. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef DEPARSE_SHARD_QUERY_H +#define DEPARSE_SHARD_QUERY_H + +#include "c.h" + +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" + + +extern void RebuildQueryStrings(Query *originalQuery, List *taskList); +extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); + + +#endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 325c6513d..8ac674b0d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -171,7 +171,7 @@ typedef struct Task bool upsertQuery; /* only applies to modify tasks */ bool insertSelectQuery; - List *selectShardList; /* only applies INSERT/SELECT tasks */ + List *relationShardList; /* only applies INSERT/SELECT tasks */ } Task; diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 4b3991434..7c96b55bf 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -13,6 +13,8 @@ #include "nodes/plannodes.h" #include "nodes/relation.h" +#include "distributed/citus_nodes.h" + /* values used by jobs and tasks which do not require identifiers */ #define INVALID_JOB_ID 0 @@ -38,6 +40,13 @@ typedef struct RelationRestriction List *prunedShardIntervalList; } RelationRestriction; +typedef struct RelationShard +{ + CitusNode type; + Oid relationId; + uint64 shardId; +} RelationShard; + extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index d3335cf32..a215a48df 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -37,5 +37,10 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *subqueryRte); extern bool InsertSelectQuery(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query); +extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); +extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); +extern void AddShardIntervalRestrictionToSelect(Query *subqery, + ShardInterval *shardInterval); + #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index fd9836bae..a1e224f9e 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -79,6 +79,7 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); /* Lock multiple shards for safe modification */ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); +extern void LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode); extern void LockMetadataSnapshot(LOCKMODE lockMode); diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 76e1f2133..53882080a 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -27,6 +27,8 @@ extern ShardInterval * LowestShardIntervalById(List *shardIntervalList); extern int CompareShardIntervals(const void *leftElement, const void *rightElement, FmgrInfo *typeCompareFunction); extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement); +extern int CompareRelationShards(const void *leftElement, + const void *rightElement); extern int FindShardIntervalIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 94f80c503..fef68b320 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -334,7 +334,6 @@ id_title AS (SELECT id, title from articles_hash WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; DEBUG: predicate pruning for shardId 840001 DEBUG: predicate pruning for shardId 840000 -DEBUG: Found no worker with all shard placements ERROR: cannot perform distributed planning on this query DETAIL: Complex table expressions are currently unsupported -- recursive CTEs are supported when filtered on partition column @@ -420,8 +419,7 @@ DEBUG: predicate pruning for shardId 840006 ERROR: cannot perform distributed planning on this query DETAIL: Complex table expressions are currently unsupported -- logically wrong query, query involves different shards --- from the same table, but still router plannable due to --- shard being placed on the same worker. +-- from the same table WITH RECURSIVE hierarchy as ( SELECT *, 1 AS level FROM company_employees @@ -439,13 +437,8 @@ DEBUG: predicate pruning for shardId 840006 DEBUG: predicate pruning for shardId 840003 DEBUG: predicate pruning for shardId 840004 DEBUG: predicate pruning for shardId 840005 -DEBUG: Creating router plan -DEBUG: Plan is router executable - company_id | employee_id | manager_id | level -------------+-------------+------------+------- - 3 | 1 | 0 | 1 -(1 row) - +ERROR: cannot perform distributed planning on this query +DETAIL: Complex table expressions are currently unsupported -- grouping sets are supported on single shard SELECT id, substring(title, 2, 1) AS subtitle, count(*) diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index c69e979a5..84d8ba940 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -213,8 +213,7 @@ WITH RECURSIVE hierarchy as ( SELECT * FROM hierarchy WHERE LEVEL <= 2; -- logically wrong query, query involves different shards --- from the same table, but still router plannable due to --- shard being placed on the same worker. +-- from the same table WITH RECURSIVE hierarchy as ( SELECT *, 1 AS level FROM company_employees From 11031bcf5513ddca3be012cb70e906e61f7b08c9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 2 Dec 2016 16:42:22 +0100 Subject: [PATCH 2/2] Enable evaluation of stable functions in INSERT..SELECT --- .../executor/multi_router_executor.c | 29 +---- .../planner/multi_logical_planner.c | 48 ------- .../planner/multi_router_planner.c | 2 - src/backend/distributed/utils/citus_clauses.c | 52 +++++++- src/backend/distributed/utils/resource_lock.c | 4 +- .../distributed/multi_logical_planner.h | 2 - src/include/distributed/resource_lock.h | 2 +- .../regress/expected/multi_insert_select.out | 122 +++++++++++++++++- src/test/regress/sql/multi_insert_select.sql | 119 ++++++++++++++++- 9 files changed, 292 insertions(+), 88 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 932bf6537..9d3f99541 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -31,6 +31,7 @@ #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -38,6 +39,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" @@ -382,7 +384,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * concurrently. */ - LockRelationShardListResources(task->relationShardList, ExclusiveLock); + LockRelationShardResources(task->relationShardList, ExclusiveLock); } } @@ -462,7 +464,7 @@ AcquireExecutorMultiShardLocks(List *taskList) * concurrently. */ - LockRelationShardListResources(task->relationShardList, ExclusiveLock); + LockRelationShardResources(task->relationShardList, ExclusiveLock); } } } @@ -621,27 +623,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) if (requiresMasterEvaluation) { - ListCell *taskCell = NULL; - Query *query = workerJob->jobQuery; - Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid; + Query *jobQuery = workerJob->jobQuery; - ExecuteMasterEvaluableFunctions(query); - - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - StringInfo newQueryString = makeStringInfo(); - - deparse_shard_query(query, relationId, task->anchorShardId, - newQueryString); - - ereport(DEBUG4, (errmsg("query before master evaluation: %s", - task->queryString))); - ereport(DEBUG4, (errmsg("query after master evaluation: %s", - newQueryString->data))); - - task->queryString = newQueryString->data; - } + ExecuteMasterEvaluableFunctions(jobQuery); + RebuildQueryStrings(jobQuery, taskList); } if (list_length(taskList) == 1) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 86545479a..e67a68b3f 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1451,42 +1451,6 @@ FindNodesOfType(MultiNode *node, int type) } -/* - * IdentifyRTE assigns an identifier to an RTE, for tracking purposes. - * - * To be able to track RTEs through postgres' query planning, which copies and - * duplicate, and modifies them, we sometimes need to figure out whether two - * RTEs are copies of the same original RTE. For that we, hackishly, use a - * field normally unused in RTE_RELATION RTEs. - * - * The assigned identifier better be unique within a plantree. - */ -void -IdentifyRTE(RangeTblEntry *rte, int identifier) -{ - Assert(rte->rtekind == RTE_RELATION); - Assert(rte->values_lists == NIL); - rte->values_lists = list_make1_int(identifier); -} - - -/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */ -int -GetRTEIdentity(RangeTblEntry *rte) -{ - Assert(rte->rtekind == RTE_RELATION); - Assert(IsA(rte->values_lists, IntList)); - Assert(list_length(rte->values_lists) == 1); - - if (rte->values_lists == NULL) - { - return 0; - } - - return linitial_int(rte->values_lists); -} - - /* * NeedsDistributedPlanning checks if the passed in query is a query running * on a distributed table. If it is, we start distributed planning. @@ -1501,7 +1465,6 @@ NeedsDistributedPlanning(Query *queryTree) ListCell *rangeTableCell = NULL; bool hasLocalRelation = false; bool hasDistributedRelation = false; - int rteIdentifier = 1; if (commandType != CMD_SELECT && commandType != CMD_INSERT && commandType != CMD_UPDATE && commandType != CMD_DELETE) @@ -1522,17 +1485,6 @@ NeedsDistributedPlanning(Query *queryTree) if (IsDistributedTable(relationId)) { hasDistributedRelation = true; - - /* - * To be able to track individual RTEs through postgres' query - * planning, we need to be able to figure out whether an RTE is - * actually a copy of another, rather than a different one. We - * simply number the RTEs starting from 1. - */ - if (rangeTableEntry->rtekind == RTE_RELATION) - { - IdentifyRTE(rangeTableEntry, rteIdentifier++); - } } else { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9bfdc1912..b77eada52 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -289,8 +289,6 @@ CreateInsertSelectRouterPlan(Query *originalQuery, workerJob->dependedJobList = NIL; workerJob->jobId = jobId; workerJob->jobQuery = originalQuery; - - /* for now we do not support any function evaluation */ workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* and finally the multi plan */ diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 521ad3aa7..f0d632e5b 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -9,6 +9,7 @@ #include "postgres.h" #include "distributed/citus_clauses.h" +#include "distributed/multi_router_planner.h" #include "catalog/pg_type.h" #include "executor/executor.h" @@ -35,6 +36,8 @@ bool RequiresMasterEvaluation(Query *query) { ListCell *targetEntryCell = NULL; + ListCell *rteCell = NULL; + ListCell *cteCell = NULL; foreach(targetEntryCell, query->targetList) { @@ -46,6 +49,31 @@ RequiresMasterEvaluation(Query *query) } } + foreach(rteCell, query->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); + + if (rte->rtekind != RTE_SUBQUERY) + { + continue; + } + + if (RequiresMasterEvaluation(rte->subquery)) + { + return true; + } + } + + foreach(cteCell, query->cteList) + { + CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell); + + if (RequiresMasterEvaluation((Query *) expr->ctequery)) + { + return true; + } + } + if (query->jointree && query->jointree->quals) { return contain_mutable_functions((Node *) query->jointree->quals); @@ -64,7 +92,10 @@ ExecuteMasterEvaluableFunctions(Query *query) { CmdType commandType = query->commandType; ListCell *targetEntryCell = NULL; + ListCell *rteCell = NULL; + ListCell *cteCell = NULL; Node *modifiedNode = NULL; + bool insertSelectQuery = InsertSelectQuery(query); if (query->jointree && query->jointree->quals) { @@ -81,7 +112,7 @@ ExecuteMasterEvaluableFunctions(Query *query) continue; } - if (commandType == CMD_INSERT) + if (commandType == CMD_INSERT && !insertSelectQuery) { modifiedNode = EvaluateNodeIfReferencesFunction((Node *) targetEntry->expr); } @@ -93,11 +124,24 @@ ExecuteMasterEvaluableFunctions(Query *query) targetEntry->expr = (Expr *) modifiedNode; } - if (query->jointree) + foreach(rteCell, query->rtable) { - Assert(!contain_mutable_functions((Node *) (query->jointree->quals))); + RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); + + if (rte->rtekind != RTE_SUBQUERY) + { + continue; + } + + ExecuteMasterEvaluableFunctions(rte->subquery); + } + + foreach(cteCell, query->cteList) + { + CommonTableExpr *expr = (CommonTableExpr *) lfirst(cteCell); + + ExecuteMasterEvaluableFunctions((Query *) expr->ctequery); } - Assert(!contain_mutable_functions((Node *) (query->targetList))); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 127b9aca9..766bc166a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -305,11 +305,11 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) /* - * LockRelationShards takes locks on all shards in a list of RelationShards + * LockRelationShardResources takes locks on all shards in a list of RelationShards * to prevent concurrent DML statements on those shards. */ void -LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode) +LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) { ListCell *relationShardCell = NULL; diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 909a88339..6487c7a28 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -182,8 +182,6 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree); -extern int GetRTEIdentity(RangeTblEntry *rte); -extern void IdentifyRTE(RangeTblEntry *rte, int identifier); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a1e224f9e..67d02a037 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -79,7 +79,7 @@ extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); /* Lock multiple shards for safe modification */ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); -extern void LockRelationShardListResources(List *relationShardList, LOCKMODE lockMode); +extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); extern void LockMetadataSnapshot(LOCKMODE lockMode); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 64422f174..c08a0c526 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -107,6 +107,86 @@ INSERT INTO raw_events_second SELECT * FROM raw_events_first; ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300004" DETAIL: Key (user_id, value_1)=(1, 10) already exists. CONTEXT: while executing command on localhost:57637 +-- stable functions should be allowed +INSERT INTO raw_events_second (user_id, time) +SELECT + user_id, now() +FROM + raw_events_first +WHERE + user_id < 0; +INSERT INTO raw_events_second (user_id) +SELECT + user_id +FROM + raw_events_first +WHERE + time > now() + interval '1 day'; +-- hide version-dependent PL/pgSQL context messages +\set VERBOSITY terse +-- make sure we evaluate stable functions on the master, once +CREATE OR REPLACE FUNCTION evaluate_on_master() +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN 0; +END; +$function$; +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master() +FROM + raw_events_first +WHERE + user_id < 0; +NOTICE: evaluating on master +-- make sure stable functions in CTEs are evaluated +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT evaluate_on_master()) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first +WHERE + user_id < 0; +NOTICE: evaluating on master +-- make sure we don't evaluate stable functions with column arguments +CREATE OR REPLACE FUNCTION evaluate_on_master(x int) +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN x; +END; +$function$; +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master(value_1) +FROM + raw_events_first +WHERE + user_id = 0; +WARNING: function public.evaluate_on_master(integer) does not exist +WARNING: function public.evaluate_on_master(integer) does not exist +ERROR: could not modify any active placements +\set VERBOSITY default +-- volatile functions should be disallowed +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, (random()*10)::int +FROM + raw_events_first; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT (random()*10)::int) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -1861,12 +1941,44 @@ DEBUG: Plan is router executable DEBUG: CommitTransactionCommand DEBUG: CommitTransaction DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: --- set back to the default -SET citus.shard_count TO DEFAULT; +RESET client_min_messages; DEBUG: StartTransactionCommand DEBUG: StartTransaction DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: DEBUG: ProcessUtility -DEBUG: CommitTransactionCommand -DEBUG: CommitTransaction -DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children: +-- Stable function in default should be allowed +ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now(); +INSERT INTO table_with_defaults (store_id, first_name, last_name) +SELECT + store_id, 'first '||store_id, 'last '||store_id +FROM + table_with_defaults +GROUP BY + store_id, first_name, last_name; +-- Volatile function in default should be disallowed +CREATE TABLE table_with_serial ( + store_id int, + s bigserial +); +SELECT create_distributed_table('table_with_serial', 'store_id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO table_with_serial (store_id) +SELECT + store_id +FROM + table_with_defaults +GROUP BY + store_id; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Volatile functions are not allowed in INSERT ... SELECT queries +DROP TABLE raw_events_first CASCADE; +NOTICE: drop cascades to view test_view +DROP TABLE raw_events_second; +DROP TABLE reference_table; +DROP TABLE agg_events; +DROP TABLE table_with_defaults; +DROP TABLE table_with_serial; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index c327d829a..db1e0b7bf 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -56,6 +56,89 @@ WHERE -- see that we get unique vialitons INSERT INTO raw_events_second SELECT * FROM raw_events_first; +-- stable functions should be allowed +INSERT INTO raw_events_second (user_id, time) +SELECT + user_id, now() +FROM + raw_events_first +WHERE + user_id < 0; + +INSERT INTO raw_events_second (user_id) +SELECT + user_id +FROM + raw_events_first +WHERE + time > now() + interval '1 day'; + +-- hide version-dependent PL/pgSQL context messages +\set VERBOSITY terse + +-- make sure we evaluate stable functions on the master, once +CREATE OR REPLACE FUNCTION evaluate_on_master() +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN 0; +END; +$function$; + +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master() +FROM + raw_events_first +WHERE + user_id < 0; + +-- make sure stable functions in CTEs are evaluated +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT evaluate_on_master()) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first +WHERE + user_id < 0; + +-- make sure we don't evaluate stable functions with column arguments +CREATE OR REPLACE FUNCTION evaluate_on_master(x int) +RETURNS int LANGUAGE plpgsql STABLE +AS $function$ +BEGIN + RAISE NOTICE 'evaluating on master'; + RETURN x; +END; +$function$; + +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, evaluate_on_master(value_1) +FROM + raw_events_first +WHERE + user_id = 0; + +\set VERBOSITY default + +-- volatile functions should be disallowed +INSERT INTO raw_events_second (user_id, value_1) +SELECT + user_id, (random()*10)::int +FROM + raw_events_first; + +INSERT INTO raw_events_second (user_id, value_1) +WITH sub_cte AS (SELECT (random()*10)::int) +SELECT + user_id, (SELECT * FROM sub_cte) +FROM + raw_events_first; + + -- add one more row INSERT INTO raw_events_first (user_id, time) VALUES (7, now()); @@ -824,5 +907,37 @@ FROM GROUP BY last_name, store_id, first_name, default_2; --- set back to the default -SET citus.shard_count TO DEFAULT; +RESET client_min_messages; + +-- Stable function in default should be allowed +ALTER TABLE table_with_defaults ADD COLUMN t timestamptz DEFAULT now(); + +INSERT INTO table_with_defaults (store_id, first_name, last_name) +SELECT + store_id, 'first '||store_id, 'last '||store_id +FROM + table_with_defaults +GROUP BY + store_id, first_name, last_name; + +-- Volatile function in default should be disallowed +CREATE TABLE table_with_serial ( + store_id int, + s bigserial +); +SELECT create_distributed_table('table_with_serial', 'store_id'); + +INSERT INTO table_with_serial (store_id) +SELECT + store_id +FROM + table_with_defaults +GROUP BY + store_id; + +DROP TABLE raw_events_first CASCADE; +DROP TABLE raw_events_second; +DROP TABLE reference_table; +DROP TABLE agg_events; +DROP TABLE table_with_defaults; +DROP TABLE table_with_serial;