Merge pull request #651 from citusdata/501_user_original_query_in_modify_task

Make router planner use original query
pull/618/head
Murat Tuncer 2016-07-19 07:45:40 +03:00 committed by GitHub
commit 25cbbf217b
13 changed files with 82 additions and 183 deletions

View File

@ -112,6 +112,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
bool routerExecutablePlan = false;
instr_time planStart;
instr_time planDuration;
Query *originalQuery = NULL;
/* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query);
@ -133,6 +134,12 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
return;
}
/*
* standard_planner scribbles on it's input, but for deparsing we need the
* unmodified form. So copy once we're sure it's a distributed query.
*/
originalQuery = copyObject(query);
/* measure the full planning time to display in EXPLAIN ANALYZE */
INSTR_TIME_SET_CURRENT(planStart);
@ -151,7 +158,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
}
}
multiPlan = CreatePhysicalPlan(query);
multiPlan = CreatePhysicalPlan(originalQuery, query);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);

View File

@ -44,6 +44,17 @@ PlannedStmt *
multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *result = NULL;
bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
Query *originalQuery = NULL;
/*
* standard_planner scribbles on it's input, but for deparsing we need the
* unmodified form. So copy once we're sure it's a distributed query.
*/
if (needsDistributedPlanning)
{
originalQuery = copyObject(parse);
}
/*
* First call into standard planner. This is required because the Citus
@ -51,9 +62,9 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
*/
result = standard_planner(parse, cursorOptions, boundParams);
if (NeedsDistributedPlanning(parse))
if (needsDistributedPlanning)
{
MultiPlan *physicalPlan = CreatePhysicalPlan(parse);
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse);
/* store required data into the planned statement */
result = MultiQueryContainerNode(result, physicalPlan);
@ -71,14 +82,14 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* physical plan process needed to produce distributed query plans.
*/
MultiPlan *
CreatePhysicalPlan(Query *parse)
CreatePhysicalPlan(Query *originalQuery, Query *query)
{
Query *parseCopy = copyObject(parse);
MultiPlan *physicalPlan = MultiRouterPlanCreate(parseCopy, TaskExecutorType);
MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query,
TaskExecutorType);
if (physicalPlan == NULL)
{
/* Create and optimize logical plan */
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(parseCopy);
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(query);
MultiLogicalPlanOptimize(logicalPlan);
/*

View File

@ -70,11 +70,7 @@ static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
static char MostPermissiveVolatileFlag(char left, char right);
static Task * RouterModifyTask(Query *query);
#if (PG_VERSION_NUM >= 90500)
static OnConflictExpr * RebuildOnConflict(Oid relationId,
OnConflictExpr *originalOnConflict);
#endif
static Task * RouterModifyTask(Query *originalQuery, Query *query);
static ShardInterval * TargetShardInterval(Query *query);
static List * QueryRestrictList(Query *query);
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
@ -82,11 +78,10 @@ static ShardInterval * FastShardPruning(Oid distributedTableId,
Const *partionColumnValue);
static Oid ExtractFirstDistributedTableId(Query *query);
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *query);
static Task * RouterSelectTask(Query *originalQuery, Query *query);
static Job * RouterQueryJob(Query *query, Task *task);
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType);
static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
static void SetRangeTablesInherited(Query *query);
/*
@ -97,7 +92,8 @@ static void SetRangeTablesInherited(Query *query);
* returns NULL.
*/
MultiPlan *
MultiRouterPlanCreate(Query *query, MultiExecutorType taskExecutorType)
MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType)
{
Task *task = NULL;
Job *job = NULL;
@ -122,17 +118,17 @@ MultiRouterPlanCreate(Query *query, MultiExecutorType taskExecutorType)
if (modifyTask)
{
ErrorIfModifyQueryNotSupported(query);
task = RouterModifyTask(query);
task = RouterModifyTask(originalQuery, query);
}
else
{
Assert(commandType == CMD_SELECT);
task = RouterSelectTask(query);
task = RouterSelectTask(originalQuery, query);
}
job = RouterQueryJob(query, task);
job = RouterQueryJob(originalQuery, task);
multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = job;
@ -681,53 +677,33 @@ MostPermissiveVolatileFlag(char left, char right)
* shard-extended deparsed SQL to be run during execution.
*/
static Task *
RouterModifyTask(Query *query)
RouterModifyTask(Query *originalQuery, Query *query)
{
ShardInterval *shardInterval = TargetShardInterval(query);
uint64 shardId = shardInterval->shardId;
FromExpr *joinTree = NULL;
StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL;
bool upsertQuery = false;
bool requiresMasterEvaluation = RequiresMasterEvaluation(query);
bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
/*
* Convert the qualifiers to an explicitly and'd clause, which is needed
* before we deparse the query. This applies to SELECT, UPDATE and
* DELETE statements.
*/
joinTree = query->jointree;
if ((joinTree != NULL) && (joinTree->quals != NULL))
{
Node *whereClause = joinTree->quals;
if (IsA(whereClause, List))
{
joinTree->quals = (Node *) make_ands_explicit((List *) whereClause);
}
}
#if (PG_VERSION_NUM >= 90500)
if (query->onConflict != NULL)
if (originalQuery->onConflict != NULL)
{
RangeTblEntry *rangeTableEntry = NULL;
Oid relationId = shardInterval->relationId;
/* set the flag */
upsertQuery = true;
/* setting an alias simplifies deparsing of UPSERTs */
rangeTableEntry = linitial(query->rtable);
rangeTableEntry = linitial(originalQuery->rtable);
if (rangeTableEntry->alias == NULL)
{
Alias *alias = makeAlias(UPSERT_ALIAS, NIL);
rangeTableEntry->alias = alias;
}
/* some fields in onConflict expression needs to be updated for deparsing */
query->onConflict = RebuildOnConflict(relationId, query->onConflict);
}
#else
@ -735,13 +711,7 @@ RouterModifyTask(Query *query)
upsertQuery = false;
#endif
/*
* We set inh flag of all range tables entries to true so that deparser will not
* add ONLY keyword to resulting query string.
*/
SetRangeTablesInherited(query);
deparse_shard_query(query, shardInterval->relationId, shardId, queryString);
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
modifyTask = CitusMakeNode(Task);
@ -758,67 +728,6 @@ RouterModifyTask(Query *query)
}
#if (PG_VERSION_NUM >= 90500)
/*
* RebuildOnConflict rebuilds OnConflictExpr for correct deparsing. The function
* makes WHERE clause elements explicit and filters dropped columns
* from the target list.
*/
static OnConflictExpr *
RebuildOnConflict(Oid relationId, OnConflictExpr *originalOnConflict)
{
OnConflictExpr *updatedOnConflict = copyObject(originalOnConflict);
Node *onConflictWhere = updatedOnConflict->onConflictWhere;
List *onConflictSet = updatedOnConflict->onConflictSet;
TupleDesc distributedRelationDesc = NULL;
ListCell *targetEntryCell = NULL;
List *filteredOnConflictSet = NIL;
Form_pg_attribute *tableAttributes = NULL;
Relation distributedRelation = RelationIdGetRelation(relationId);
/* Convert onConflictWhere qualifiers to an explicitly and'd clause */
updatedOnConflict->onConflictWhere =
(Node *) make_ands_explicit((List *) onConflictWhere);
/*
* Here we handle dropped columns on the distributed table. onConflictSet
* includes the table attributes even if they are dropped,
* since the it is expanded via expand_targetlist() on standard planner.
*/
/* get the relation tuple descriptor and table attributes */
distributedRelationDesc = RelationGetDescr(distributedRelation);
tableAttributes = distributedRelationDesc->attrs;
foreach(targetEntryCell, onConflictSet)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
FormData_pg_attribute *tableAttribute = tableAttributes[targetEntry->resno - 1];
/* skip dropped columns */
if (tableAttribute->attisdropped)
{
continue;
}
/* we only want to deparse non-dropped columns */
filteredOnConflictSet = lappend(filteredOnConflictSet, targetEntry);
}
/* close distributedRelation to prevent leaks */
RelationClose(distributedRelation);
/* set onConflictSet again with the filtered list */
updatedOnConflict->onConflictSet = filteredOnConflictSet;
return updatedOnConflict;
}
#endif
/*
* TargetShardInterval determines the single shard targeted by a provided command.
* If no matching shards exist, or if the modification targets more than one one
@ -1070,7 +979,7 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
/* RouterSelectTask builds a Task to represent a single shard select query */
static Task *
RouterSelectTask(Query *query)
RouterSelectTask(Query *originalQuery, Query *query)
{
Task *task = NULL;
ShardInterval *shardInterval = TargetShardInterval(query);
@ -1078,31 +987,13 @@ RouterSelectTask(Query *query)
uint64 shardId = INVALID_SHARD_ID;
bool upsertQuery = false;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
FromExpr *joinTree = NULL;
Assert(shardInterval != NULL);
Assert(commandType == CMD_SELECT);
shardId = shardInterval->shardId;
/*
* Convert the qualifiers to an explicitly and'd clause, which is needed
* before we deparse the query.
*/
joinTree = query->jointree;
if ((joinTree != NULL) && (joinTree->quals != NULL))
{
Node *whereClause = (Node *) make_ands_explicit((List *) joinTree->quals);
joinTree->quals = whereClause;
}
/*
* We set inh flag of all range tables entries to true so that deparser will not
* add ONLY keyword to resulting query string.
*/
SetRangeTablesInherited(query);
deparse_shard_query(query, shardInterval->relationId, shardId, queryString);
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
task = CitusMakeNode(Task);
@ -1363,24 +1254,3 @@ ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
return false;
}
/*
* RouterSetRangeTablesInherited sets inh flag of all range table entries to true.
* We basically iterate over all range table entries and set their inh flag.
*/
static void
SetRangeTablesInherited(Query *query)
{
List *rangeTableList = query->rtable;
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind == RTE_RELATION)
{
rangeTableEntry->inh = true;
}
}
}

View File

@ -18,7 +18,7 @@ extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan;
extern struct MultiPlan * CreatePhysicalPlan(Query *parse);
extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan);

View File

@ -30,7 +30,7 @@
#define UPSERT_ALIAS "citus_table_alias"
#endif
extern MultiPlan * MultiRouterPlanCreate(Query *query,
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);

View File

@ -64,29 +64,23 @@ CREATE TEMPORARY SEQUENCE rows_inserted;
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
\gset
-- insert to proxy, again relying on default value
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- will be fixed with another PR
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- test copy with bad row in middle
-- will be fixed with another PR
\set VERBOSITY terse
COPY pg_temp.:"proxy_tablename" FROM stdin;
ERROR: null value in column "data" violates not-null constraint
ERROR: could not modify any active placements
\set VERBOSITY default
-- verify rows were copied to distributed table
-- will be fixed with another PR
SELECT * FROM insert_target ORDER BY id ASC;
id | data
----+-----------------------------
1 | lorem ipsum
2 | dolor sit amet
3 | consectetur adipiscing elit
4 | sed do eiusmod
5 | tempor incididunt ut
6 | labore et dolore
(6 rows)
----+------
(0 rows)
-- the counter should match the number of rows stored
-- will be fixed with another PR
SELECT currval('rows_inserted');
currval
---------
6
(1 row)
ERROR: currval of sequence "rows_inserted" is not yet defined in this session
SET client_min_messages TO DEFAULT;

View File

@ -439,6 +439,13 @@ UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id
UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :worker_1_port
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :worker_2_port
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :master_port
-- immutable function calls with vars are also allowed
UPDATE limit_orders
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;

View File

@ -312,7 +312,7 @@ DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker".
-- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1)
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries other than in from-clause are currently unsupported
@ -1041,6 +1041,7 @@ DEBUG: Plan is router executable
(5 rows)
-- parametric prepare queries can be router plannable
-- it will be fixed with another pr
PREPARE author_articles(int) as
SELECT *
FROM articles_hash
@ -1049,15 +1050,9 @@ EXECUTE author_articles(1);
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 840001
DEBUG: Plan is router executable
id | author_id | title | word_count
----+-----------+--------------+------------
1 | 1 | arsenous | 9572
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
41 | 1 | aznavour | 11814
(5 rows)
WARNING: there is no parameter $1
CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
-- queries inside plpgsql functions could be router plannable
CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$
DECLARE

View File

@ -254,7 +254,7 @@ ORDER BY articles.id;
(50 rows)
-- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM authors a2 WHERE a.id = a2.id LIMIT 1)
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles a;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries other than in from-clause are currently unsupported

View File

@ -60,9 +60,11 @@ SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_
\gset
-- insert to proxy, again relying on default value
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- will be fixed with another PR
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- test copy with bad row in middle
-- will be fixed with another PR
\set VERBOSITY terse
COPY pg_temp.:"proxy_tablename" FROM stdin;
2 dolor sit amet
@ -76,9 +78,11 @@ COPY pg_temp.:"proxy_tablename" FROM stdin;
\set VERBOSITY default
-- verify rows were copied to distributed table
-- will be fixed with another PR
SELECT * FROM insert_target ORDER BY id ASC;
-- the counter should match the number of rows stored
-- will be fixed with another PR
SELECT currval('rows_inserted');
SET client_min_messages TO DEFAULT;

View File

@ -321,6 +321,16 @@ UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246;
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :worker_1_port
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :worker_2_port
CREATE FUNCTION immutable_append(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
\c - - - :master_port
-- immutable function calls with vars are also allowed
UPDATE limit_orders
SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246;

View File

@ -168,7 +168,7 @@ FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE tes
ORDER BY articles_hash.id;
-- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1)
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a;
-- simple lookup query
@ -475,6 +475,7 @@ PREPARE author_1_articles as
EXECUTE author_1_articles;
-- parametric prepare queries can be router plannable
-- it will be fixed with another pr
PREPARE author_articles(int) as
SELECT *
FROM articles_hash

View File

@ -144,7 +144,7 @@ FROM articles, (SELECT id, word_count FROM articles) AS test WHERE test.id = art
ORDER BY articles.id;
-- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM authors a2 WHERE a.id = a2.id LIMIT 1)
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles a;
-- joins are not supported between local and distributed tables