mirror of https://github.com/citusdata/citus.git
Consider coordinator in intermediate result optimization
It seems that we were not considering the case where coordinator was added to the cluster as a worker in the optimization of intermediate results. This could lead to errors when coordinator was added as a worker.pull/4628/head
parent
c0f2817b70
commit
24e60b44a1
|
@ -158,7 +158,6 @@ distributed_planner(Query *parse,
|
||||||
}
|
}
|
||||||
|
|
||||||
int rteIdCounter = 1;
|
int rteIdCounter = 1;
|
||||||
|
|
||||||
DistributedPlanningContext planContext = {
|
DistributedPlanningContext planContext = {
|
||||||
.query = parse,
|
.query = parse,
|
||||||
.cursorOptions = cursorOptions,
|
.cursorOptions = cursorOptions,
|
||||||
|
|
|
@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
||||||
List *subPlanList = distributedPlan->subPlanList;
|
List *subPlanList = distributedPlan->subPlanList;
|
||||||
ListCell *subPlanCell = NULL;
|
ListCell *subPlanCell = NULL;
|
||||||
int workerNodeCount = ActiveReadableNonCoordinatorNodeCount();
|
int workerNodeCount = list_length(ActiveReadableNodeList());
|
||||||
|
|
||||||
foreach(subPlanCell, usedSubPlanNodeList)
|
foreach(subPlanCell, usedSubPlanNodeList)
|
||||||
{
|
{
|
||||||
|
@ -272,7 +272,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
static void
|
static void
|
||||||
AppendAllWorkerNodes(IntermediateResultsHashEntry *entry)
|
AppendAllWorkerNodes(IntermediateResultsHashEntry *entry)
|
||||||
{
|
{
|
||||||
List *workerNodeList = ActiveReadableNonCoordinatorNodeList();
|
List *workerNodeList = ActiveReadableNodeList();
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
|
|
@ -778,6 +778,107 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
-- issue 4508 table_1 and table_2 are used to test
|
||||||
|
-- some edge cases around intermediate result pruning
|
||||||
|
CREATE TABLE table_1 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table_2 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_2', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||||
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text)
|
||||||
|
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text)
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages to debug1;
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||||
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
||||||
|
NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||||
|
NOTICE: executing the copy locally for shard xxxxx
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE
|
||||||
|
@ -788,7 +889,7 @@ DROP TABLE ref;
|
||||||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
||||||
DROP TABLE test_append_table;
|
DROP TABLE test_append_table;
|
||||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||||
NOTICE: drop cascades to 13 other objects
|
NOTICE: drop cascades to 19 other objects
|
||||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -3,10 +3,10 @@ SET search_path TO local_shard_copy;
|
||||||
SET client_min_messages TO DEBUG;
|
SET client_min_messages TO DEBUG;
|
||||||
SET citus.next_shard_id TO 1570000;
|
SET citus.next_shard_id TO 1570000;
|
||||||
SET citus.replicate_reference_tables_on_activate TO off;
|
SET citus.replicate_reference_tables_on_activate TO off;
|
||||||
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||||
master_add_node
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
32
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
|
|
|
@ -0,0 +1,185 @@
|
||||||
|
CREATE SCHEMA mx_coordinator_shouldhaveshards;
|
||||||
|
SET search_path TO mx_coordinator_shouldhaveshards;
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.replication_model TO streaming;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- issue 4508 table_1 and table_2 are used to test some edge cases
|
||||||
|
-- around intermediate result pruning
|
||||||
|
CREATE TABLE table_1 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key', colocate_with := 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table_2 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||||
|
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
|
set citus.log_intermediate_results TO ON;
|
||||||
|
set client_min_messages to debug1;
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO mx_coordinator_shouldhaveshards;
|
||||||
|
set citus.log_intermediate_results TO ON;
|
||||||
|
set client_min_messages to debug1;
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE;
|
||||||
|
NOTICE: drop cascades to 6 other objects
|
||||||
|
DETAIL: drop cascades to table mx_coordinator_shouldhaveshards.table_1
|
||||||
|
drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130052
|
||||||
|
drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130055
|
||||||
|
drop cascades to table mx_coordinator_shouldhaveshards.table_2
|
||||||
|
drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130056
|
||||||
|
drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130059
|
||||||
|
SELECT master_remove_node('localhost', :master_port);
|
||||||
|
master_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
|
@ -26,6 +26,7 @@ test: multi_mx_hide_shard_names
|
||||||
test: multi_mx_add_coordinator
|
test: multi_mx_add_coordinator
|
||||||
test: multi_mx_modifications_to_reference_tables
|
test: multi_mx_modifications_to_reference_tables
|
||||||
test: multi_mx_partitioning
|
test: multi_mx_partitioning
|
||||||
|
test: mx_coordinator_shouldhaveshards
|
||||||
test: multi_mx_copy_data multi_mx_router_planner
|
test: multi_mx_copy_data multi_mx_router_planner
|
||||||
test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
|
test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
|
||||||
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
|
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
|
||||||
|
|
|
@ -323,6 +323,50 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
|
|
||||||
|
-- issue 4508 table_1 and table_2 are used to test
|
||||||
|
-- some edge cases around intermediate result pruning
|
||||||
|
CREATE TABLE table_1 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key');
|
||||||
|
|
||||||
|
CREATE TABLE table_2 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_2', 'key');
|
||||||
|
|
||||||
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||||
|
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
|
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages to debug1;
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ SET client_min_messages TO DEBUG;
|
||||||
SET citus.next_shard_id TO 1570000;
|
SET citus.next_shard_id TO 1570000;
|
||||||
SET citus.replicate_reference_tables_on_activate TO off;
|
SET citus.replicate_reference_tables_on_activate TO off;
|
||||||
|
|
||||||
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||||
|
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
CREATE SCHEMA mx_coordinator_shouldhaveshards;
|
||||||
|
SET search_path TO mx_coordinator_shouldhaveshards;
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.replication_model TO streaming;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||||
|
|
||||||
|
-- issue 4508 table_1 and table_2 are used to test some edge cases
|
||||||
|
-- around intermediate result pruning
|
||||||
|
CREATE TABLE table_1 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_1', 'key', colocate_with := 'none');
|
||||||
|
|
||||||
|
CREATE TABLE table_2 (key int, value text);
|
||||||
|
SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
|
||||||
|
|
||||||
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||||
|
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
|
|
||||||
|
set citus.log_intermediate_results TO ON;
|
||||||
|
set client_min_messages to debug1;
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO mx_coordinator_shouldhaveshards;
|
||||||
|
|
||||||
|
set citus.log_intermediate_results TO ON;
|
||||||
|
set client_min_messages to debug1;
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
|
INSERT INTO table_1 SELECT count(*),
|
||||||
|
key
|
||||||
|
FROM a JOIN table_2 USING (key)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
||||||
|
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_1
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key >= (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) <= (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
|
|
||||||
|
DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE;
|
||||||
|
|
||||||
|
SELECT master_remove_node('localhost', :master_port);
|
Loading…
Reference in New Issue