mirror of https://github.com/citusdata/citus.git
address feedback from Sait Talha & Hadi
parent
4f7989ad8e
commit
ef778c1cd7
|
@ -2646,8 +2646,8 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
|
|
||||||
Assert(anchorShardId != INVALID_SHARD_ID);
|
Assert(anchorShardId != INVALID_SHARD_ID);
|
||||||
|
|
||||||
List *selectPlacementList = PlacementsForWorkersContainingAllShards(taskShardList);
|
List *taskPlacementList = PlacementsForWorkersContainingAllShards(taskShardList);
|
||||||
if (list_length(selectPlacementList) == 0)
|
if (list_length(taskPlacementList) == 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
|
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
|
||||||
"shards in the query")));
|
"shards in the query")));
|
||||||
|
@ -2683,7 +2683,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
|
|
||||||
subqueryTask->dependentTaskList = NULL;
|
subqueryTask->dependentTaskList = NULL;
|
||||||
subqueryTask->anchorShardId = anchorShardId;
|
subqueryTask->anchorShardId = anchorShardId;
|
||||||
subqueryTask->taskPlacementList = selectPlacementList;
|
subqueryTask->taskPlacementList = taskPlacementList;
|
||||||
subqueryTask->relationShardList = relationShardList;
|
subqueryTask->relationShardList = relationShardList;
|
||||||
|
|
||||||
return subqueryTask;
|
return subqueryTask;
|
||||||
|
|
|
@ -2179,7 +2179,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
* accessed locally.
|
* accessed locally.
|
||||||
*
|
*
|
||||||
* If generateDummyPlacement is true and there are no shards that need to be
|
* If generateDummyPlacement is true and there are no shards that need to be
|
||||||
* accessed to answer the query (shardsPresent is false), then as single
|
* accessed to answer the query (shardsPresent is false), then a single
|
||||||
* placement is returned that is either local or follows a round-robin policy.
|
* placement is returned that is either local or follows a round-robin policy.
|
||||||
* A typical example is a router query that only reads an intermediate result.
|
* A typical example is a router query that only reads an intermediate result.
|
||||||
* This will happen on the coordinator, unless the user wants to balance the
|
* This will happen on the coordinator, unless the user wants to balance the
|
||||||
|
@ -3296,7 +3296,7 @@ MultiRouterPlannableQuery(Query *query)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* local tables are allowed if there are no distributed tables */
|
/* local tables are not allowed if there are distributed tables */
|
||||||
if (hasLocalTable && hasDistributedTable)
|
if (hasLocalTable && hasDistributedTable)
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
|
|
@ -29,20 +29,6 @@ BEGIN
|
||||||
END LOOP;
|
END LOOP;
|
||||||
RETURN;
|
RETURN;
|
||||||
END; $$ language plpgsql;
|
END; $$ language plpgsql;
|
||||||
-- Is a distributed plan?
|
|
||||||
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
|
|
||||||
RETURNS BOOLEAN AS $$
|
|
||||||
DECLARE
|
|
||||||
query_plan TEXT;
|
|
||||||
BEGIN
|
|
||||||
FOR query_plan IN execute explain_commmand LOOP
|
|
||||||
IF query_plan LIKE '%Task Count:%'
|
|
||||||
THEN
|
|
||||||
RETURN TRUE;
|
|
||||||
END IF;
|
|
||||||
END LOOP;
|
|
||||||
RETURN FALSE;
|
|
||||||
END; $$ language plpgsql;
|
|
||||||
-- helper function to quickly run SQL on the whole cluster
|
-- helper function to quickly run SQL on the whole cluster
|
||||||
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||||
RETURNS void LANGUAGE plpgsql AS $$
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
|
|
@ -386,22 +386,12 @@ $Q$);
|
||||||
DROP VIEW numbers_v, local_table_v;
|
DROP VIEW numbers_v, local_table_v;
|
||||||
--
|
--
|
||||||
-- Joins between reference tables and materialized views are allowed to
|
-- Joins between reference tables and materialized views are allowed to
|
||||||
-- be planned locally.
|
-- be planned to be executed locally.
|
||||||
--
|
--
|
||||||
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
|
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
|
||||||
NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
|
NOTICE: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
|
||||||
REFRESH MATERIALIZED VIEW numbers_v;
|
REFRESH MATERIALIZED VIEW numbers_v;
|
||||||
NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10))
|
NOTICE: executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10))
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
|
||||||
$Q$);
|
|
||||||
plan_is_distributed
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
t
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
BEGIN;
|
|
||||||
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
||||||
NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a)))
|
NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a)))
|
||||||
a | b | a
|
a | b | a
|
||||||
|
@ -410,7 +400,6 @@ NOTICE: executing the command locally: SELECT squares.a, squares.b, numbers_v.a
|
||||||
2 | 4 | 2
|
2 | 4 | 2
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
END;
|
|
||||||
--
|
--
|
||||||
-- Joins between reference tables, local tables, and function calls
|
-- Joins between reference tables, local tables, and function calls
|
||||||
-- are allowed
|
-- are allowed
|
||||||
|
@ -424,24 +413,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- but it should be okay if the function call is not a data source
|
-- and it should be okay if the function call is not a data source
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
|
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
|
||||||
$Q$);
|
NOTICE: executing the command locally: SELECT abs(a.a) AS abs FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a)
|
||||||
plan_is_distributed
|
abs
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
2
|
||||||
(1 row)
|
20
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
|
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
|
||||||
$Q$);
|
NOTICE: executing the command locally: SELECT a.a FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a) ORDER BY (abs(a.a))
|
||||||
plan_is_distributed
|
a
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
2
|
||||||
(1 row)
|
20
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
TRUNCATE local_table;
|
TRUNCATE local_table;
|
||||||
TRUNCATE numbers;
|
TRUNCATE numbers;
|
||||||
|
|
|
@ -33,21 +33,6 @@ BEGIN
|
||||||
RETURN;
|
RETURN;
|
||||||
END; $$ language plpgsql;
|
END; $$ language plpgsql;
|
||||||
|
|
||||||
-- Is a distributed plan?
|
|
||||||
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
|
|
||||||
RETURNS BOOLEAN AS $$
|
|
||||||
DECLARE
|
|
||||||
query_plan TEXT;
|
|
||||||
BEGIN
|
|
||||||
FOR query_plan IN execute explain_commmand LOOP
|
|
||||||
IF query_plan LIKE '%Task Count:%'
|
|
||||||
THEN
|
|
||||||
RETURN TRUE;
|
|
||||||
END IF;
|
|
||||||
END LOOP;
|
|
||||||
RETURN FALSE;
|
|
||||||
END; $$ language plpgsql;
|
|
||||||
|
|
||||||
-- helper function to quickly run SQL on the whole cluster
|
-- helper function to quickly run SQL on the whole cluster
|
||||||
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||||
RETURNS void LANGUAGE plpgsql AS $$
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
|
|
@ -185,18 +185,12 @@ DROP VIEW numbers_v, local_table_v;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Joins between reference tables and materialized views are allowed to
|
-- Joins between reference tables and materialized views are allowed to
|
||||||
-- be planned locally.
|
-- be planned to be executed locally.
|
||||||
--
|
--
|
||||||
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
|
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
|
||||||
REFRESH MATERIALIZED VIEW numbers_v;
|
REFRESH MATERIALIZED VIEW numbers_v;
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
|
||||||
$Q$);
|
|
||||||
|
|
||||||
BEGIN;
|
|
||||||
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
|
||||||
END;
|
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Joins between reference tables, local tables, and function calls
|
-- Joins between reference tables, local tables, and function calls
|
||||||
|
@ -206,16 +200,10 @@ SELECT count(*)
|
||||||
FROM local_table a, numbers b, generate_series(1, 10) c
|
FROM local_table a, numbers b, generate_series(1, 10) c
|
||||||
WHERE a.a = b.a AND a.a = c;
|
WHERE a.a = b.a AND a.a = c;
|
||||||
|
|
||||||
-- but it should be okay if the function call is not a data source
|
-- and it should be okay if the function call is not a data source
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
|
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
|
||||||
$Q$);
|
|
||||||
|
|
||||||
SELECT public.plan_is_distributed($Q$
|
|
||||||
EXPLAIN (COSTS FALSE)
|
|
||||||
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
|
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
|
||||||
$Q$);
|
|
||||||
|
|
||||||
TRUNCATE local_table;
|
TRUNCATE local_table;
|
||||||
TRUNCATE numbers;
|
TRUNCATE numbers;
|
||||||
|
|
Loading…
Reference in New Issue