From b37ef0e394bb56d21440061e0b3f5c66c8ffd101 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sat, 14 Dec 2019 13:38:16 +0100 Subject: [PATCH] Fix error in distributed queries when shards are on the coordinator --- .../distributed/executor/local_executor.c | 1 + .../planner/multi_physical_planner.c | 2 +- .../expected/coordinator_shouldhaveshards.out | 118 ++++++++++++++++++ src/test/regress/multi_schedule | 1 + .../sql/coordinator_shouldhaveshards.sql | 62 +++++++++ 5 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/coordinator_shouldhaveshards.out create mode 100644 src/test/regress/sql/coordinator_shouldhaveshards.sql diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index ea0b82fad..266c10ba6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -75,6 +75,7 @@ #include "distributed/citus_custom_scan.h" #include "distributed/local_executor.h" #include "distributed/multi_executor.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 2f6af1bd1..703dfacf3 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4937,7 +4937,7 @@ GreedyAssignTaskList(List *taskList) uint32 taskCount = list_length(taskList); /* get the worker node list and sort the list */ - List *workerNodeList = ActiveReadableWorkerNodeList(); + List *workerNodeList = ActiveReadableNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out new file mode 100644 index 000000000..bc9d0e8eb --- /dev/null +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -0,0 +1,118 @@ +-- Test queries on a distributed table with shards on the coordinator +CREATE SCHEMA coordinator_shouldhaveshards; +SET search_path TO coordinator_shouldhaveshards; +-- idempotently add node to allow this test to run without add_coordinator +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +---------- + 1 +(1 row) + +RESET client_min_messages; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +---------- + 1 +(1 row) + +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x', colocate_with := 'none'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) +WHERE logicalrelid = 'test'::regclass AND groupid = 0; + count +------- + 2 +(1 row) + +-- INSERT..SELECT with COPY under the covers +INSERT INTO test SELECT s,s FROM generate_series(2,100) s; +-- router queries execute locally +INSERT INTO test VALUES (1, 1); +SELECT y FROM test WHERE x = 1; + y +--- + 1 +(1 row) + +-- multi-shard queries connect to localhost +SELECT count(*) FROM test; + count +------- + 100 +(1 row) + +WITH a AS (SELECT * FROM test) SELECT count(*) FROM test; + count +------- + 100 +(1 row) + +-- multi-shard queries in transaction blocks execute locally +BEGIN; +SELECT y FROM test WHERE x = 1; + y +--- + 1 +(1 row) + +SELECT count(*) FROM test; + count +------- + 100 +(1 row) + +END; +BEGIN; +SELECT y FROM test WHERE x = 1; + y +--- + 1 +(1 row) + +SELECT count(*) FROM test; + count +------- + 100 +(1 row) + +END; +-- DDL connects to locahost +ALTER TABLE test ADD COLUMN z int; +-- DDL after local execution +BEGIN; +SELECT y FROM test WHERE x = 1; + y +--- + 1 +(1 row) + +ALTER TABLE test DROP COLUMN z; +ERROR: cannot execute command because a local execution has already been done in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +ROLLBACK; +BEGIN; +ALTER TABLE test DROP COLUMN z; +SELECT y FROM test WHERE x = 1; + y +--- + 1 +(1 row) + +END; +DELETE FROM test; +DROP TABLE test; +DROP SCHEMA coordinator_shouldhaveshards CASCADE; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + ?column? +---------- + 1 +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8e0dbb715..4542b18da 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -271,6 +271,7 @@ test: multi_replicate_reference_table test: multi_reference_table test: foreign_key_to_reference_table test: replicate_reference_tables_to_coordinator +test: coordinator_shouldhaveshards test: remove_coordinator diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql new file mode 100644 index 000000000..42fe96bec --- /dev/null +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -0,0 +1,62 @@ +-- Test queries on a distributed table with shards on the coordinator + +CREATE SCHEMA coordinator_shouldhaveshards; +SET search_path TO coordinator_shouldhaveshards; + +-- idempotently add node to allow this test to run without add_coordinator +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +SET citus.shard_replication_factor TO 1; + +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x', colocate_with := 'none'); + +SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) +WHERE logicalrelid = 'test'::regclass AND groupid = 0; + +-- INSERT..SELECT with COPY under the covers +INSERT INTO test SELECT s,s FROM generate_series(2,100) s; + +-- router queries execute locally +INSERT INTO test VALUES (1, 1); +SELECT y FROM test WHERE x = 1; + +-- multi-shard queries connect to localhost +SELECT count(*) FROM test; +WITH a AS (SELECT * FROM test) SELECT count(*) FROM test; + +-- multi-shard queries in transaction blocks execute locally +BEGIN; +SELECT y FROM test WHERE x = 1; +SELECT count(*) FROM test; +END; + +BEGIN; +SELECT y FROM test WHERE x = 1; +SELECT count(*) FROM test; +END; + +-- DDL connects to locahost +ALTER TABLE test ADD COLUMN z int; + +-- DDL after local execution +BEGIN; +SELECT y FROM test WHERE x = 1; +ALTER TABLE test DROP COLUMN z; +ROLLBACK; + +BEGIN; +ALTER TABLE test DROP COLUMN z; +SELECT y FROM test WHERE x = 1; +END; + +DELETE FROM test; +DROP TABLE test; + +DROP SCHEMA coordinator_shouldhaveshards CASCADE; + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);