Fix error in distributed queries when shards are on the coordinator

pull/3308/head
Marco Slot 2019-12-14 13:38:16 +01:00
parent 2349f838a1
commit b37ef0e394
5 changed files with 183 additions and 1 deletions

View File

@ -75,6 +75,7 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */

View File

@ -4937,7 +4937,7 @@ GreedyAssignTaskList(List *taskList)
uint32 taskCount = list_length(taskList);
/* get the worker node list and sort the list */
List *workerNodeList = ActiveReadableWorkerNodeList();
List *workerNodeList = ActiveReadableNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*

View File

@ -0,0 +1,118 @@
-- Test queries on a distributed table with shards on the coordinator
CREATE SCHEMA coordinator_shouldhaveshards;
SET search_path TO coordinator_shouldhaveshards;
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
----------
1
(1 row)
RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column?
----------
1
(1 row)
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x', colocate_with := 'none');
create_distributed_table
--------------------------
(1 row)
SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
WHERE logicalrelid = 'test'::regclass AND groupid = 0;
count
-------
2
(1 row)
-- INSERT..SELECT with COPY under the covers
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
-- router queries execute locally
INSERT INTO test VALUES (1, 1);
SELECT y FROM test WHERE x = 1;
y
---
1
(1 row)
-- multi-shard queries connect to localhost
SELECT count(*) FROM test;
count
-------
100
(1 row)
WITH a AS (SELECT * FROM test) SELECT count(*) FROM test;
count
-------
100
(1 row)
-- multi-shard queries in transaction blocks execute locally
BEGIN;
SELECT y FROM test WHERE x = 1;
y
---
1
(1 row)
SELECT count(*) FROM test;
count
-------
100
(1 row)
END;
BEGIN;
SELECT y FROM test WHERE x = 1;
y
---
1
(1 row)
SELECT count(*) FROM test;
count
-------
100
(1 row)
END;
-- DDL connects to locahost
ALTER TABLE test ADD COLUMN z int;
-- DDL after local execution
BEGIN;
SELECT y FROM test WHERE x = 1;
y
---
1
(1 row)
ALTER TABLE test DROP COLUMN z;
ERROR: cannot execute command because a local execution has already been done in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
ROLLBACK;
BEGIN;
ALTER TABLE test DROP COLUMN z;
SELECT y FROM test WHERE x = 1;
y
---
1
(1 row)
END;
DELETE FROM test;
DROP TABLE test;
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?
----------
1
(1 row)

View File

@ -271,6 +271,7 @@ test: multi_replicate_reference_table
test: multi_reference_table
test: foreign_key_to_reference_table
test: replicate_reference_tables_to_coordinator
test: coordinator_shouldhaveshards
test: remove_coordinator

View File

@ -0,0 +1,62 @@
-- Test queries on a distributed table with shards on the coordinator
CREATE SCHEMA coordinator_shouldhaveshards;
SET search_path TO coordinator_shouldhaveshards;
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
SET citus.shard_replication_factor TO 1;
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x', colocate_with := 'none');
SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
WHERE logicalrelid = 'test'::regclass AND groupid = 0;
-- INSERT..SELECT with COPY under the covers
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
-- router queries execute locally
INSERT INTO test VALUES (1, 1);
SELECT y FROM test WHERE x = 1;
-- multi-shard queries connect to localhost
SELECT count(*) FROM test;
WITH a AS (SELECT * FROM test) SELECT count(*) FROM test;
-- multi-shard queries in transaction blocks execute locally
BEGIN;
SELECT y FROM test WHERE x = 1;
SELECT count(*) FROM test;
END;
BEGIN;
SELECT y FROM test WHERE x = 1;
SELECT count(*) FROM test;
END;
-- DDL connects to locahost
ALTER TABLE test ADD COLUMN z int;
-- DDL after local execution
BEGIN;
SELECT y FROM test WHERE x = 1;
ALTER TABLE test DROP COLUMN z;
ROLLBACK;
BEGIN;
ALTER TABLE test DROP COLUMN z;
SELECT y FROM test WHERE x = 1;
END;
DELETE FROM test;
DROP TABLE test;
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);