mirror of https://github.com/citusdata/citus.git
Merge pull request #3564 from citusdata/fix-early-exits-on-subplan-pruning
Fix early exits on intermediate result pruning There are 2 problems with our early exit strategy that this commit fixes: 1- When we decide that a subplan results are sent to all worker nodes, we used to skip traversing the whole distributed plan, instead of skipping only the subplan. 2- We used to consider all available nodes in the cluster (secondaries and inactive nodes as well as active primaries) when deciding on early exit strategy. This resulted in failures to early exit when there are secondaries or inactive nodes.pull/3571/head^2
commit
6e6763678c
|
@ -106,7 +106,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
|||
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
||||
List *subPlanList = distributedPlan->subPlanList;
|
||||
ListCell *subPlanCell = NULL;
|
||||
int workerNodeCount = GetWorkerNodeCount();
|
||||
int workerNodeCount = ActiveReadableWorkerNodeCount();
|
||||
|
||||
foreach(subPlanCell, usedSubPlanNodeList)
|
||||
{
|
||||
|
@ -124,13 +124,14 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
|||
}
|
||||
|
||||
/*
|
||||
* There is no need to traverse the whole plan if the intermediate result
|
||||
* will be written to a local file and send to all nodes
|
||||
* There is no need to traverse the subplan if the intermediate result
|
||||
* will be written to a local file and sent to all nodes. Note that the
|
||||
* remaining subplans in the distributed plan should still be traversed.
|
||||
*/
|
||||
if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile)
|
||||
{
|
||||
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
|
||||
{
|
||||
|
@ -139,7 +140,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
|||
*
|
||||
* If we have reference tables in the distributed plan, all the
|
||||
* workers will be in the node list. We can improve intermediate result
|
||||
* pruning by deciding which reference table shard will be accessed earlier
|
||||
* pruning by deciding which reference table shard will be accessed earlier.
|
||||
*/
|
||||
AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount);
|
||||
|
||||
|
|
|
@ -933,6 +933,105 @@ DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- test case for issue #3556
|
||||
CREATE TABLE accounts (id text PRIMARY KEY);
|
||||
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
|
||||
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "accounts_pkey" for table "accounts"
|
||||
DEBUG: building index "accounts_pkey" on table "accounts" serially
|
||||
CREATE TABLE stats (account_id text PRIMARY KEY, spent int);
|
||||
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
|
||||
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "stats_pkey" for table "stats"
|
||||
DEBUG: building index "stats_pkey" on table "stats" serially
|
||||
SELECT create_distributed_table('accounts', 'id', colocate_with => 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO accounts (id) VALUES ('foo');
|
||||
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts
|
||||
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
DEBUG: Subplan XXX_3 will be written to local file
|
||||
DEBUG: Subplan XXX_4 will be written to local file
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
-- confirm that the pruning works well when using round-robin as well
|
||||
SET citus.task_assignment_policy to 'round-robin';
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts
|
||||
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
SET citus.task_assignment_policy to DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned;
|
||||
DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;
|
||||
DROP SCHEMA intermediate_result_pruning;
|
||||
|
|
|
@ -61,6 +61,23 @@ SELECT create_distributed_function('get_local_node_id_volatile()');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- test case for issue #3556
|
||||
CREATE TABLE accounts (id text PRIMARY KEY);
|
||||
CREATE TABLE stats (account_id text PRIMARY KEY, spent int);
|
||||
SELECT create_distributed_table('accounts', 'id', colocate_with => 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO accounts (id) VALUES ('foo');
|
||||
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
|
||||
-- connection worker and get ready for the tests
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO local_shard_execution;
|
||||
|
@ -89,6 +106,50 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R
|
|||
RETURN shard_is_local;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- test case for issue #3556
|
||||
SET citus.log_intermediate_results TO TRUE;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
DEBUG: CTE joined_stats_cte_1 is going to be inlined via distributed planning
|
||||
DEBUG: CTE joined_stats_cte_2 is going to be inlined via distributed planning
|
||||
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts
|
||||
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts
|
||||
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (local_shard_execution.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
|
||||
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
DEBUG: Subplan XXX_3 will be written to local file
|
||||
DEBUG: Subplan XXX_4 will be written to local file
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
SET citus.log_intermediate_results TO DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- pick some example values that reside on the shards locally and remote
|
||||
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||
-- we'll use these values in the tests
|
||||
|
@ -660,7 +721,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation() AS $
|
|||
IF nodeId <= 0 THEN
|
||||
RAISE NOTICE 'unexpected node id';
|
||||
END IF;
|
||||
|
||||
|
||||
-- regular router
|
||||
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = 1;
|
||||
IF nodeId <= 0 THEN
|
||||
|
@ -701,7 +762,7 @@ CREATE OR REPLACE PROCEDURE only_local_execution_with_function_evaluation_param(
|
|||
IF nodeId <= 0 THEN
|
||||
RAISE NOTICE 'unexpected node id';
|
||||
END IF;
|
||||
|
||||
|
||||
-- regular router
|
||||
SELECT get_local_node_id_volatile() INTO nodeId FROM distributed_table d1 JOIN distributed_table d2 USING (key) WHERE d1.key = $1;
|
||||
IF nodeId <= 0 THEN
|
||||
|
|
|
@ -21,6 +21,7 @@ SELECT create_distributed_table('source_table', 'a');
|
|||
|
||||
INSERT INTO dest_table (a, b) VALUES (1, 1);
|
||||
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
-- simluate actually having secondary nodes
|
||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||
|
@ -73,9 +74,95 @@ DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT DISTINCT des
|
|||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) foo ORDER BY a
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- intermediate result pruning should still work
|
||||
SET citus.log_intermediate_results TO TRUE;
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH dest_table_cte AS (
|
||||
SELECT a
|
||||
FROM dest_table
|
||||
),
|
||||
joined_source_table_cte_1 AS (
|
||||
SELECT b, a
|
||||
FROM source_table
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
),
|
||||
joined_source_table_cte_2 AS (
|
||||
SELECT b, a
|
||||
FROM joined_source_table_cte_1
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
)
|
||||
SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL))
|
||||
FROM dest_table_cte
|
||||
INNER JOIN joined_source_table_cte_2 USING (a)
|
||||
) inner_query;
|
||||
DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning
|
||||
DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning
|
||||
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
|
||||
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
|
||||
DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
|
||||
DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
|
||||
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
|
||||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
DEBUG: Subplan XXX_3 will be written to local file
|
||||
DEBUG: Subplan XXX_4 will be written to local file
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
-- confirm that the pruning works well when using round-robin as well
|
||||
SET citus.task_assignment_policy to 'round-robin';
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH dest_table_cte AS (
|
||||
SELECT a
|
||||
FROM dest_table
|
||||
),
|
||||
joined_source_table_cte_1 AS (
|
||||
SELECT b, a
|
||||
FROM source_table
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
),
|
||||
joined_source_table_cte_2 AS (
|
||||
SELECT b, a
|
||||
FROM joined_source_table_cte_1
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
)
|
||||
SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL))
|
||||
FROM dest_table_cte
|
||||
INNER JOIN joined_source_table_cte_2 USING (a)
|
||||
) inner_query;
|
||||
DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning
|
||||
DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning
|
||||
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
|
||||
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
|
||||
DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
|
||||
DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
|
||||
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a))
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
SET citus.task_assignment_policy to DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
SET citus.log_intermediate_results TO DEFAULT;
|
||||
-- insert into is definitely not allowed
|
||||
INSERT INTO dest_table (a, b)
|
||||
SELECT a, b FROM source_table;
|
||||
|
@ -83,4 +170,4 @@ ERROR: writing to worker nodes is not currently allowed
|
|||
DETAIL: citus.use_secondary_nodes is set to 'always'
|
||||
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
|
||||
UPDATE pg_dist_node SET noderole = 'primary';
|
||||
DROP TABLE dest_table;
|
||||
DROP TABLE source_table, dest_table;
|
||||
|
|
|
@ -551,7 +551,65 @@ WHERE
|
|||
range_column IN ('A', 'E') AND
|
||||
range_partitioned.data IN (SELECT data FROM some_data);
|
||||
|
||||
|
||||
-- test case for issue #3556
|
||||
CREATE TABLE accounts (id text PRIMARY KEY);
|
||||
CREATE TABLE stats (account_id text PRIMARY KEY, spent int);
|
||||
|
||||
SELECT create_distributed_table('accounts', 'id', colocate_with => 'none');
|
||||
SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts');
|
||||
|
||||
INSERT INTO accounts (id) VALUES ('foo');
|
||||
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
|
||||
-- confirm that the pruning works well when using round-robin as well
|
||||
SET citus.task_assignment_policy to 'round-robin';
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
|
||||
SET citus.task_assignment_policy to DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned;
|
||||
DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;
|
||||
DROP SCHEMA intermediate_result_pruning;
|
||||
|
||||
|
|
|
@ -46,6 +46,16 @@ BEGIN
|
|||
END; $$ language plpgsql VOLATILE;
|
||||
SELECT create_distributed_function('get_local_node_id_volatile()');
|
||||
|
||||
-- test case for issue #3556
|
||||
CREATE TABLE accounts (id text PRIMARY KEY);
|
||||
CREATE TABLE stats (account_id text PRIMARY KEY, spent int);
|
||||
|
||||
SELECT create_distributed_table('accounts', 'id', colocate_with => 'none');
|
||||
SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts');
|
||||
|
||||
INSERT INTO accounts (id) VALUES ('foo');
|
||||
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
|
||||
|
||||
-- connection worker and get ready for the tests
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO local_shard_execution;
|
||||
|
@ -76,6 +86,35 @@ CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) R
|
|||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- test case for issue #3556
|
||||
SET citus.log_intermediate_results TO TRUE;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH accounts_cte AS (
|
||||
SELECT id AS account_id
|
||||
FROM accounts
|
||||
),
|
||||
joined_stats_cte_1 AS (
|
||||
SELECT spent, account_id
|
||||
FROM stats
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
),
|
||||
joined_stats_cte_2 AS (
|
||||
SELECT spent, account_id
|
||||
FROM joined_stats_cte_1
|
||||
INNER JOIN accounts_cte USING (account_id)
|
||||
)
|
||||
SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
|
||||
FROM accounts_cte
|
||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||
) inner_query;
|
||||
|
||||
SET citus.log_intermediate_results TO DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
|
||||
-- pick some example values that reside on the shards locally and remote
|
||||
|
||||
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||
|
|
|
@ -16,6 +16,7 @@ SELECT create_distributed_table('source_table', 'a');
|
|||
INSERT INTO dest_table (a, b) VALUES (1, 1);
|
||||
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||
|
||||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
|
||||
-- simluate actually having secondary nodes
|
||||
|
@ -49,7 +50,59 @@ FROM
|
|||
dest_table.b IN (1,2,3,4)
|
||||
) SELECT * FROM cte ORDER BY 1 DESC LIMIT 5
|
||||
) as foo ORDER BY 1;
|
||||
|
||||
-- intermediate result pruning should still work
|
||||
SET citus.log_intermediate_results TO TRUE;
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH dest_table_cte AS (
|
||||
SELECT a
|
||||
FROM dest_table
|
||||
),
|
||||
joined_source_table_cte_1 AS (
|
||||
SELECT b, a
|
||||
FROM source_table
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
),
|
||||
joined_source_table_cte_2 AS (
|
||||
SELECT b, a
|
||||
FROM joined_source_table_cte_1
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
)
|
||||
SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL))
|
||||
FROM dest_table_cte
|
||||
INNER JOIN joined_source_table_cte_2 USING (a)
|
||||
) inner_query;
|
||||
|
||||
-- confirm that the pruning works well when using round-robin as well
|
||||
SET citus.task_assignment_policy to 'round-robin';
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
WITH dest_table_cte AS (
|
||||
SELECT a
|
||||
FROM dest_table
|
||||
),
|
||||
joined_source_table_cte_1 AS (
|
||||
SELECT b, a
|
||||
FROM source_table
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
),
|
||||
joined_source_table_cte_2 AS (
|
||||
SELECT b, a
|
||||
FROM joined_source_table_cte_1
|
||||
INNER JOIN dest_table_cte USING (a)
|
||||
)
|
||||
SELECT SUM(b) OVER (PARTITION BY coalesce(a, NULL))
|
||||
FROM dest_table_cte
|
||||
INNER JOIN joined_source_table_cte_2 USING (a)
|
||||
) inner_query;
|
||||
|
||||
SET citus.task_assignment_policy to DEFAULT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
SET citus.log_intermediate_results TO DEFAULT;
|
||||
|
||||
-- insert into is definitely not allowed
|
||||
INSERT INTO dest_table (a, b)
|
||||
|
@ -57,4 +110,4 @@ INSERT INTO dest_table (a, b)
|
|||
|
||||
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
|
||||
UPDATE pg_dist_node SET noderole = 'primary';
|
||||
DROP TABLE dest_table;
|
||||
DROP TABLE source_table, dest_table;
|
||||
|
|
Loading…
Reference in New Issue