diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index e3cc32625..32771d68b 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -468,29 +468,19 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, /* either the local or the remote should be non-nil */ Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL)); - if (list_length(task->taskPlacementList) == 1) + if (localTaskPlacementList == NIL) { - /* - * At this point, the task has a single placement (e.g,. anchor shard - * is distributed table's shard). So, it is either added to local or - * remote taskList. - */ - if (localTaskPlacementList == NIL) - { - *remoteTaskList = lappend(*remoteTaskList, task); - } - else - { - *localTaskList = lappend(*localTaskList, task); - } + *remoteTaskList = lappend(*remoteTaskList, task); + } + else if (remoteTaskPlacementList == NIL) + { + *localTaskList = lappend(*localTaskList, task); } else { /* - * At this point, we're dealing with reference tables or intermediate - * results where the task has placements on both local and remote - * nodes. We always prefer to use local placement, and require remote - * placements only for modifications. + * At this point, we're dealing with a task that has placements on both + * local and remote nodes. */ task->partiallyLocalOrRemote = true; @@ -505,6 +495,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { + /* since shard replication factor > 1, we should have at least 1 remote task */ + Assert(remoteTaskPlacementList != NIL); Task *remoteTask = copyObject(task); remoteTask->taskPlacementList = remoteTaskPlacementList; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 38f31ce85..1d2819309 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -14,6 +14,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/multi_server_executor.h" +#include "distributed/listutils.h" #include "utils/datum.h" @@ -73,6 +74,18 @@ CitusSetTag(Node *node, int tag) } \ while (0) +#define COPY_STRING_LIST(fldname) \ + do { \ + char *curString = NULL; \ + List *newList = NIL; \ + foreach_ptr(curString, from->fldname) { \ + char *newString = curString ? pstrdup(curString) : (char *) NULL; \ + newList = lappend(newList, newString); \ + } \ + newnode->fldname = newList; \ + } \ + while (0) + static void CopyTaskQuery(Task *newnode, Task *from); static void @@ -271,13 +284,13 @@ CopyTaskQuery(Task *newnode, Task *from) case TASK_QUERY_TEXT_PER_PLACEMENT: { - COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings); + COPY_STRING_LIST(taskQuery.data.perPlacementQueryStrings); break; } case TASK_QUERY_TEXT_LIST: { - COPY_NODE_FIELD(taskQuery.data.queryStringList); + COPY_STRING_LIST(taskQuery.data.queryStringList); break; } diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 85cb4b95c..191f728c0 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -375,13 +375,48 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co 100 | 3 | 2 | 3 | 2 | 1 (1 row) +-- issue #3801 +SET citus.shard_replication_factor TO 2; +CREATE TABLE dist_table(a int); +ERROR: relation "dist_table" already exists +SELECT create_distributed_table('dist_table', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +-- this will use perPlacementQueryStrings, make sure it works correctly with +-- copying task +INSERT INTO dist_table SELECT a + 1 FROM dist_table; +ROLLBACK; +BEGIN; +SET citus.shard_replication_factor TO 2; +CREATE TABLE dist_table1(a int); +-- this will use queryStringList, make sure it works correctly with +-- copying task +SELECT create_distributed_table('dist_table1', 'a'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; +DROP TABLE dist_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 4 other objects -DETAIL: drop cascades to table dist_table -drop cascades to table ref +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table ref drop cascades to table ref_1503016 drop cascades to table local SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 81757d72b..57dd97344 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1639,6 +1639,15 @@ ORDER BY 1; -> Seq Scan on colocated_table_test_1250005 colocated_table_test (25 rows) +WITH a as (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2) +SELECT ct.value_1, count(*) FROM colocated_table_test ct join a on ct.value_1 = a.value_2 +WHERE exists (select * from a) +GROUP BY 1 ORDER BY 1; + value_1 | count +--------------------------------------------------------------------- + 2 | 5 +(1 row) + -- clean up tables, ... SET client_min_messages TO ERROR; DROP SEQUENCE example_ref_value_seq; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 4b2a03332..566d54254 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -155,11 +155,29 @@ SELECT * FROM ref JOIN local ON (a = x); -- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +-- issue #3801 +SET citus.shard_replication_factor TO 2; +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a'); +BEGIN; +-- this will use perPlacementQueryStrings, make sure it works correctly with +-- copying task +INSERT INTO dist_table SELECT a + 1 FROM dist_table; +ROLLBACK; + +BEGIN; +SET citus.shard_replication_factor TO 2; +CREATE TABLE dist_table1(a int); +-- this will use queryStringList, make sure it works correctly with +-- copying task +SELECT create_distributed_table('dist_table1', 'a'); +ROLLBACK; RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; +DROP TABLE dist_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 76b5f143c..39d75ef06 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -1012,6 +1012,11 @@ EXPLAIN (COSTS OFF) SELECT value_1, count(*) FROM colocated_table_test GROUP BY HAVING (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2) > 0 ORDER BY 1; +WITH a as (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2) +SELECT ct.value_1, count(*) FROM colocated_table_test ct join a on ct.value_1 = a.value_2 +WHERE exists (select * from a) +GROUP BY 1 ORDER BY 1; + -- clean up tables, ... SET client_min_messages TO ERROR; DROP SEQUENCE example_ref_value_seq;