mirror of https://github.com/citusdata/citus.git
Reliably detect local tables in router queries in 9.4 (#4418)
Co-authored-by: Marco Slot <marco.slot@gmail.com>pull/4466/head
parent
59774b1dd4
commit
8fae9aae96
|
@ -45,6 +45,7 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/query_pushdown_planning.h"
|
||||
#include "distributed/query_utils.h"
|
||||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_restriction_equivalence.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
|
@ -2057,8 +2058,6 @@ PlanRouterQuery(Query *originalQuery,
|
|||
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
||||
Const **partitionValueConst)
|
||||
{
|
||||
RelationRestrictionContext *relationRestrictionContext =
|
||||
plannerRestrictionContext->relationRestrictionContext;
|
||||
bool isMultiShardQuery = false;
|
||||
DeferredErrorMessage *planningError = NULL;
|
||||
bool shardsPresent = false;
|
||||
|
@ -2171,7 +2170,12 @@ PlanRouterQuery(Query *originalQuery,
|
|||
/* we need anchor shard id for select queries with router planner */
|
||||
uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList);
|
||||
|
||||
bool hasLocalRelation = relationRestrictionContext->hasLocalRelation;
|
||||
/*
|
||||
* We keep track of hasLocalRelation in plannerRestrictionContext->
|
||||
* relationRestrictionContext, but in rare cases tables are excluded from
|
||||
* there (e.g. catalog table on inside of an inner join). So we recheck.
|
||||
*/
|
||||
bool hasLocalRelation = FindNodeCheck((Node *) originalQuery, IsLocalTableRTE);
|
||||
|
||||
List *taskPlacementList =
|
||||
CreateTaskPlacementListForShardIntervals(*prunedShardIntervalListList,
|
||||
|
|
|
@ -168,7 +168,6 @@ static bool ShouldRecursivelyPlanSetOperation(Query *query,
|
|||
RecursivePlanningContext *context);
|
||||
static void RecursivelyPlanSetOperations(Query *query, Node *node,
|
||||
RecursivePlanningContext *context);
|
||||
static bool IsLocalTableRTE(Node *node);
|
||||
static void RecursivelyPlanSubquery(Query *subquery,
|
||||
RecursivePlanningContext *planningContext);
|
||||
static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId,
|
||||
|
@ -1060,7 +1059,7 @@ RecursivelyPlanSetOperations(Query *query, Node *node,
|
|||
* is a range table relation entry that points to a local
|
||||
* relation (i.e., not a distributed relation).
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
IsLocalTableRTE(Node *node)
|
||||
{
|
||||
if (node == NULL)
|
||||
|
|
|
@ -33,5 +33,6 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
|
|||
List *resultIdList,
|
||||
bool useBinaryCopyFormat);
|
||||
extern bool GeneratingSubplans(void);
|
||||
extern bool IsLocalTableRTE(Node *node);
|
||||
|
||||
#endif /* RECURSIVE_PLANNING_H */
|
||||
|
|
|
@ -1641,6 +1641,76 @@ DETAIL: distribution column value: 1
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- if these queries get routed, they would fail since number1() does not exist
|
||||
-- on workers. This tests an exceptional case in which some local tables bypass
|
||||
-- checks.
|
||||
CREATE OR REPLACE FUNCTION number1(OUT datid int)
|
||||
RETURNS SETOF int
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
RETURN QUERY SELECT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT 1 FROM authors_reference r JOIN (
|
||||
SELECT s.datid FROM number1() s LEFT JOIN pg_database d ON s.datid = d.oid
|
||||
) num_db ON (r.id = num_db.datid) LIMIT 1;
|
||||
DEBUG: found no worker with all shard placements
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM public.number1() s(datid)
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT s.datid FROM ((SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) s LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- same scenario with a view
|
||||
CREATE VIEW num_db AS
|
||||
SELECT s.datid FROM number1() s LEFT JOIN pg_database d ON s.datid = d.oid;
|
||||
SELECT 1 FROM authors_reference r JOIN num_db ON (r.id = num_db.datid) LIMIT 1;
|
||||
DEBUG: found no worker with all shard placements
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM public.number1() s(datid)
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
DEBUG: generating subplan XXX_2 for subquery SELECT s.datid FROM ((SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) s LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- with a CTE in a view
|
||||
WITH cte AS (SELECT * FROM num_db)
|
||||
SELECT 1 FROM authors_reference r JOIN cte ON (r.id = cte.datid) LIMIT 1;
|
||||
DEBUG: found no worker with all shard placements
|
||||
DEBUG: generating subplan XXX_1 for CTE cte: SELECT datid FROM (SELECT s.datid FROM (public.number1() s(datid) LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))) num_db
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) cte ON ((r.id OPERATOR(pg_catalog.=) cte.datid))) LIMIT 1
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- hide changes between major versions
|
||||
RESET client_min_messages;
|
||||
-- with pg_stat_activity view
|
||||
WITH pg_stat_activity AS (
|
||||
SELECT
|
||||
pg_stat_activity.datid,
|
||||
pg_stat_activity.application_name,
|
||||
pg_stat_activity.query
|
||||
FROM pg_catalog.pg_stat_activity
|
||||
)
|
||||
SELECT 1 FROM authors_reference r LEFT JOIN pg_stat_activity ON (r.id = pg_stat_activity.datid) LIMIT 1;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
-- CTEs with where false
|
||||
-- terse because distribution column inference varies between pg11 & pg12
|
||||
\set VERBOSITY terse
|
||||
|
@ -2525,6 +2595,8 @@ DROP FUNCTION author_articles_max_id();
|
|||
DROP FUNCTION author_articles_id_word_count();
|
||||
DROP MATERIALIZED VIEW mv_articles_hash_empty;
|
||||
DROP MATERIALIZED VIEW mv_articles_hash_data;
|
||||
DROP VIEW num_db;
|
||||
DROP FUNCTION number1();
|
||||
DROP TABLE articles_hash;
|
||||
DROP TABLE articles_single_shard_hash;
|
||||
DROP TABLE authors_hash;
|
||||
|
|
|
@ -711,6 +711,47 @@ ORDER BY id;
|
|||
INTERSECT
|
||||
(SELECT * FROM articles_hash WHERE author_id = 2 and 1=0);
|
||||
|
||||
-- if these queries get routed, they would fail since number1() does not exist
|
||||
-- on workers. This tests an exceptional case in which some local tables bypass
|
||||
-- checks.
|
||||
CREATE OR REPLACE FUNCTION number1(OUT datid int)
|
||||
RETURNS SETOF int
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
RETURN QUERY SELECT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT 1 FROM authors_reference r JOIN (
|
||||
SELECT s.datid FROM number1() s LEFT JOIN pg_database d ON s.datid = d.oid
|
||||
) num_db ON (r.id = num_db.datid) LIMIT 1;
|
||||
|
||||
-- same scenario with a view
|
||||
CREATE VIEW num_db AS
|
||||
SELECT s.datid FROM number1() s LEFT JOIN pg_database d ON s.datid = d.oid;
|
||||
|
||||
SELECT 1 FROM authors_reference r JOIN num_db ON (r.id = num_db.datid) LIMIT 1;
|
||||
|
||||
-- with a CTE in a view
|
||||
WITH cte AS (SELECT * FROM num_db)
|
||||
SELECT 1 FROM authors_reference r JOIN cte ON (r.id = cte.datid) LIMIT 1;
|
||||
|
||||
-- hide changes between major versions
|
||||
RESET client_min_messages;
|
||||
|
||||
-- with pg_stat_activity view
|
||||
WITH pg_stat_activity AS (
|
||||
SELECT
|
||||
pg_stat_activity.datid,
|
||||
pg_stat_activity.application_name,
|
||||
pg_stat_activity.query
|
||||
FROM pg_catalog.pg_stat_activity
|
||||
)
|
||||
SELECT 1 FROM authors_reference r LEFT JOIN pg_stat_activity ON (r.id = pg_stat_activity.datid) LIMIT 1;
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
|
||||
-- CTEs with where false
|
||||
-- terse because distribution column inference varies between pg11 & pg12
|
||||
\set VERBOSITY terse
|
||||
|
@ -1182,6 +1223,9 @@ DROP FUNCTION author_articles_id_word_count();
|
|||
DROP MATERIALIZED VIEW mv_articles_hash_empty;
|
||||
DROP MATERIALIZED VIEW mv_articles_hash_data;
|
||||
|
||||
DROP VIEW num_db;
|
||||
DROP FUNCTION number1();
|
||||
|
||||
DROP TABLE articles_hash;
|
||||
DROP TABLE articles_single_shard_hash;
|
||||
DROP TABLE authors_hash;
|
||||
|
|
Loading…
Reference in New Issue