mirror of https://github.com/citusdata/citus.git
Fix task copy and appending empty task in ExtractLocalAndRemoteTasks (#3802)
* Not append empty task in ExtractLocalAndRemoteTasks ExtractLocalAndRemoteTasks extracts the local and remote tasks. If we do not have a local task the localTaskPlacementList will be NIL, in this case we should not append anything to local tasks. Previously we would first check if a task contains a single placement or not, now we first check if there is any local task before doing anything. * fix copy of node task Task node has task query, which might contain a list of strings in its fields. We were using postgres copyObject for these lists. Postgres assumes that each element of list will be a node type. If it is not a node type it will error. As a solution to that, a new macro is introduced to copy a list of strings.pull/3805/head
parent
3fecf0b732
commit
cbda951395
|
@ -468,29 +468,19 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
/* either the local or the remote should be non-nil */
|
/* either the local or the remote should be non-nil */
|
||||||
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
|
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
|
||||||
|
|
||||||
if (list_length(task->taskPlacementList) == 1)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* At this point, the task has a single placement (e.g,. anchor shard
|
|
||||||
* is distributed table's shard). So, it is either added to local or
|
|
||||||
* remote taskList.
|
|
||||||
*/
|
|
||||||
if (localTaskPlacementList == NIL)
|
if (localTaskPlacementList == NIL)
|
||||||
{
|
{
|
||||||
*remoteTaskList = lappend(*remoteTaskList, task);
|
*remoteTaskList = lappend(*remoteTaskList, task);
|
||||||
}
|
}
|
||||||
else
|
else if (remoteTaskPlacementList == NIL)
|
||||||
{
|
{
|
||||||
*localTaskList = lappend(*localTaskList, task);
|
*localTaskList = lappend(*localTaskList, task);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* At this point, we're dealing with reference tables or intermediate
|
* At this point, we're dealing with a task that has placements on both
|
||||||
* results where the task has placements on both local and remote
|
* local and remote nodes.
|
||||||
* nodes. We always prefer to use local placement, and require remote
|
|
||||||
* placements only for modifications.
|
|
||||||
*/
|
*/
|
||||||
task->partiallyLocalOrRemote = true;
|
task->partiallyLocalOrRemote = true;
|
||||||
|
|
||||||
|
@ -505,6 +495,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
/* since shard replication factor > 1, we should have at least 1 remote task */
|
||||||
|
Assert(remoteTaskPlacementList != NIL);
|
||||||
Task *remoteTask = copyObject(task);
|
Task *remoteTask = copyObject(task);
|
||||||
remoteTask->taskPlacementList = remoteTaskPlacementList;
|
remoteTask->taskPlacementList = remoteTaskPlacementList;
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
|
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "utils/datum.h"
|
#include "utils/datum.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -73,6 +74,18 @@ CitusSetTag(Node *node, int tag)
|
||||||
} \
|
} \
|
||||||
while (0)
|
while (0)
|
||||||
|
|
||||||
|
#define COPY_STRING_LIST(fldname) \
|
||||||
|
do { \
|
||||||
|
char *curString = NULL; \
|
||||||
|
List *newList = NIL; \
|
||||||
|
foreach_ptr(curString, from->fldname) { \
|
||||||
|
char *newString = curString ? pstrdup(curString) : (char *) NULL; \
|
||||||
|
newList = lappend(newList, newString); \
|
||||||
|
} \
|
||||||
|
newnode->fldname = newList; \
|
||||||
|
} \
|
||||||
|
while (0)
|
||||||
|
|
||||||
static void CopyTaskQuery(Task *newnode, Task *from);
|
static void CopyTaskQuery(Task *newnode, Task *from);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -271,13 +284,13 @@ CopyTaskQuery(Task *newnode, Task *from)
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_PER_PLACEMENT:
|
case TASK_QUERY_TEXT_PER_PLACEMENT:
|
||||||
{
|
{
|
||||||
COPY_NODE_FIELD(taskQuery.data.perPlacementQueryStrings);
|
COPY_STRING_LIST(taskQuery.data.perPlacementQueryStrings);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TASK_QUERY_TEXT_LIST:
|
case TASK_QUERY_TEXT_LIST:
|
||||||
{
|
{
|
||||||
COPY_NODE_FIELD(taskQuery.data.queryStringList);
|
COPY_STRING_LIST(taskQuery.data.queryStringList);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -375,13 +375,48 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co
|
||||||
100 | 3 | 2 | 3 | 2 | 1
|
100 | 3 | 2 | 3 | 2 | 1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- issue #3801
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE dist_table(a int);
|
||||||
|
ERROR: relation "dist_table" already exists
|
||||||
|
SELECT create_distributed_table('dist_table', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- this will use perPlacementQueryStrings, make sure it works correctly with
|
||||||
|
-- copying task
|
||||||
|
INSERT INTO dist_table SELECT a + 1 FROM dist_table;
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE dist_table1(a int);
|
||||||
|
-- this will use queryStringList, make sure it works correctly with
|
||||||
|
-- copying task
|
||||||
|
SELECT create_distributed_table('dist_table1', 'a');
|
||||||
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503023, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
||||||
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503025, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
||||||
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503026, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
||||||
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer)');SELECT worker_apply_shard_ddl_command (1503028, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
RESET citus.enable_cte_inlining;
|
RESET citus.enable_cte_inlining;
|
||||||
DELETE FROM test;
|
DELETE FROM test;
|
||||||
DROP TABLE test;
|
DROP TABLE test;
|
||||||
|
DROP TABLE dist_table;
|
||||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 3 other objects
|
||||||
DETAIL: drop cascades to table dist_table
|
DETAIL: drop cascades to table ref
|
||||||
drop cascades to table ref
|
|
||||||
drop cascades to table ref_1503016
|
drop cascades to table ref_1503016
|
||||||
drop cascades to table local
|
drop cascades to table local
|
||||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||||
|
|
|
@ -1639,6 +1639,15 @@ ORDER BY 1;
|
||||||
-> Seq Scan on colocated_table_test_1250005 colocated_table_test
|
-> Seq Scan on colocated_table_test_1250005 colocated_table_test
|
||||||
(25 rows)
|
(25 rows)
|
||||||
|
|
||||||
|
WITH a as (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2)
|
||||||
|
SELECT ct.value_1, count(*) FROM colocated_table_test ct join a on ct.value_1 = a.value_2
|
||||||
|
WHERE exists (select * from a)
|
||||||
|
GROUP BY 1 ORDER BY 1;
|
||||||
|
value_1 | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | 5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- clean up tables, ...
|
-- clean up tables, ...
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SEQUENCE example_ref_value_seq;
|
DROP SEQUENCE example_ref_value_seq;
|
||||||
|
|
|
@ -155,11 +155,29 @@ SELECT * FROM ref JOIN local ON (a = x);
|
||||||
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
||||||
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
||||||
|
|
||||||
|
-- issue #3801
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE dist_table(a int);
|
||||||
|
SELECT create_distributed_table('dist_table', 'a');
|
||||||
|
BEGIN;
|
||||||
|
-- this will use perPlacementQueryStrings, make sure it works correctly with
|
||||||
|
-- copying task
|
||||||
|
INSERT INTO dist_table SELECT a + 1 FROM dist_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE dist_table1(a int);
|
||||||
|
-- this will use queryStringList, make sure it works correctly with
|
||||||
|
-- copying task
|
||||||
|
SELECT create_distributed_table('dist_table1', 'a');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
RESET citus.enable_cte_inlining;
|
RESET citus.enable_cte_inlining;
|
||||||
|
|
||||||
DELETE FROM test;
|
DELETE FROM test;
|
||||||
DROP TABLE test;
|
DROP TABLE test;
|
||||||
|
DROP TABLE dist_table;
|
||||||
|
|
||||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||||
|
|
||||||
|
|
|
@ -1012,6 +1012,11 @@ EXPLAIN (COSTS OFF) SELECT value_1, count(*) FROM colocated_table_test GROUP BY
|
||||||
HAVING (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2) > 0
|
HAVING (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2) > 0
|
||||||
ORDER BY 1;
|
ORDER BY 1;
|
||||||
|
|
||||||
|
WITH a as (SELECT rt.value_2 FROM reference_table_test rt where rt.value_2 = 2)
|
||||||
|
SELECT ct.value_1, count(*) FROM colocated_table_test ct join a on ct.value_1 = a.value_2
|
||||||
|
WHERE exists (select * from a)
|
||||||
|
GROUP BY 1 ORDER BY 1;
|
||||||
|
|
||||||
-- clean up tables, ...
|
-- clean up tables, ...
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SEQUENCE example_ref_value_seq;
|
DROP SEQUENCE example_ref_value_seq;
|
||||||
|
|
Loading…
Reference in New Issue