diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 6a459c792..053d1292f 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -81,13 +81,16 @@ worker_split_copy(PG_FUNCTION_ARGS) shardIntervalToSplitCopy, splitCopyInfoList); - char *sourceShardToCopyName = generate_qualified_relation_name( - shardIntervalToSplitCopy->relationId); + Oid sourceShardToCopySchemaOId = get_rel_namespace(shardIntervalToSplitCopy->relationId); + char *sourceShardToCopySchemaName = get_namespace_name(sourceShardToCopySchemaOId); + char *sourceShardToCopyName = get_rel_name(shardIntervalToSplitCopy->relationId); AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy); + char *sourceShardToCopyQualifiedName = quote_qualified_identifier(sourceShardToCopySchemaName, + sourceShardToCopyName); StringInfo selectShardQueryForCopy = makeStringInfo(); appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", sourceShardToCopyName); + "SELECT * FROM %s;", sourceShardToCopyQualifiedName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index c6665e27e..8adbb5f69 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -71,24 +71,42 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +SELECT COUNT(*) FROM sensors; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM reference_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM colocated_dist_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + -- END: Load data into tables. -- BEGIN : Display current state. -- TODO(niupre): Can we refactor this to be a function? -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) ORDER BY logicalrelid, shardminvalue::BIGINT; - shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport --------------------------------------------------------------------- - 8981000 | sensors | -2147483648 | -1 | localhost | 57637 | 8610000 - 8981001 | sensors | 0 | 2147483647 | localhost | 57638 | 8610001 - 8981003 | colocated_dist_table | -2147483648 | -1 | localhost | 57637 | 8610004 - 8981004 | colocated_dist_table | 0 | 2147483647 | localhost | 57638 | 8610005 - 8981005 | table_with_index_rep_identity | -2147483648 | -1 | localhost | 57637 | 8610006 - 8981006 | table_with_index_rep_identity | 0 | 2147483647 | localhost | 57638 | 8610007 + 8981000 | sensors | -2147483648 | -1 | localhost | 57637 + 8981001 | sensors | 0 | 2147483647 | localhost | 57638 + 8981003 | colocated_dist_table | -2147483648 | -1 | localhost | 57637 + 8981004 | colocated_dist_table | 0 | 2147483647 | localhost | 57638 + 8981005 | table_with_index_rep_identity | -2147483648 | -1 | localhost | 57637 + 8981006 | table_with_index_rep_identity | 0 | 2147483647 | localhost | 57638 (6 rows) \c - - - :worker_1_port @@ -224,30 +242,30 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- END : Move a shard post split. -- BEGIN : Display current state. -- TODO(niupre): Can we refactor this to be a function? -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) ORDER BY logicalrelid, shardminvalue::BIGINT; - shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport --------------------------------------------------------------------- - 8981007 | sensors | -2147483648 | -1073741824 | localhost | 57638 | 136 - 8981008 | sensors | -1073741823 | -1 | localhost | 57638 | 122 - 8981013 | sensors | 0 | 536870911 | localhost | 57637 | 127 - 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 | 128 - 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 | 129 - 8981009 | colocated_dist_table | -2147483648 | -1073741824 | localhost | 57638 | 137 - 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 | 124 - 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 | 130 - 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 | 131 - 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 | 132 - 8981011 | table_with_index_rep_identity | -2147483648 | -1073741824 | localhost | 57638 | 138 - 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 | 126 - 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 | 133 - 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 | 134 - 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 | 135 + 8981007 | sensors | -2147483648 | -1073741824 | localhost | 57638 + 8981008 | sensors | -1073741823 | -1 | localhost | 57638 + 8981013 | sensors | 0 | 536870911 | localhost | 57637 + 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 + 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 + 8981009 | colocated_dist_table | -2147483648 | -1073741824 | localhost | 57638 + 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 + 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 + 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 + 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 + 8981011 | table_with_index_rep_identity | -2147483648 | -1073741824 | localhost | 57638 + 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 + 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 + 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 + 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 (15 rows) \c - - - :worker_1_port @@ -377,36 +395,56 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) SET search_path TO "citus_split_test_schema"; -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) ORDER BY logicalrelid, shardminvalue::BIGINT; - shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport | placementid + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport --------------------------------------------------------------------- - 8981031 | sensors | -2147483648 | -2100000000 | localhost | 57637 | 139 - 8981032 | sensors | -2099999999 | -1073741824 | localhost | 57638 | 140 - 8981008 | sensors | -1073741823 | -1 | localhost | 57638 | 122 - 8981013 | sensors | 0 | 536870911 | localhost | 57637 | 127 - 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 | 128 - 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 | 129 - 8981033 | colocated_dist_table | -2147483648 | -2100000000 | localhost | 57637 | 141 - 8981034 | colocated_dist_table | -2099999999 | -1073741824 | localhost | 57638 | 142 - 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 | 124 - 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 | 130 - 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 | 131 - 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 | 132 - 8981035 | table_with_index_rep_identity | -2147483648 | -2100000000 | localhost | 57637 | 143 - 8981036 | table_with_index_rep_identity | -2099999999 | -1073741824 | localhost | 57638 | 144 - 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 | 126 - 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 | 133 - 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 | 134 - 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 | 135 + 8981031 | sensors | -2147483648 | -2100000000 | localhost | 57637 + 8981032 | sensors | -2099999999 | -1073741824 | localhost | 57638 + 8981008 | sensors | -1073741823 | -1 | localhost | 57638 + 8981013 | sensors | 0 | 536870911 | localhost | 57637 + 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 + 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 + 8981033 | colocated_dist_table | -2147483648 | -2100000000 | localhost | 57637 + 8981034 | colocated_dist_table | -2099999999 | -1073741824 | localhost | 57638 + 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 + 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 + 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 + 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 + 8981035 | table_with_index_rep_identity | -2147483648 | -2100000000 | localhost | 57637 + 8981036 | table_with_index_rep_identity | -2099999999 | -1073741824 | localhost | 57638 + 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 + 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 + 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 + 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 (18 rows) -- END: Split second time on another schema +-- BEGIN: Validate Data Count +SELECT COUNT(*) FROM sensors; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM reference_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM colocated_dist_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +-- END: Validate Data Count --BEGIN : Cleanup \c - postgres - :master_port DROP SCHEMA "citus_split_test_schema" CASCADE; diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 87bd54790..52f93d5c1 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -1,6 +1,6 @@ CREATE SCHEMA worker_split_copy_test; SET search_path TO worker_split_copy_test; -SET citus.shard_count TO 1; +SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81070000; -- BEGIN: Create distributed table and insert data. @@ -11,7 +11,7 @@ SELECT create_distributed_table('"test !/ \n _""dist_123_table"', 'id'); (1 row) -INSERT INTO "test !/ \n _""dist_123_table" (id, value) (SELECT g.id, 'N' FROM generate_series(1, 100) AS g(id)); +INSERT INTO "test !/ \n _""dist_123_table" (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); -- END: Create distributed table and insert data. -- BEGIN: Switch to Worker1, Create target shards in worker for local 2-way split copy. \c - - - :worker_1_port @@ -20,6 +20,12 @@ CREATE TABLE worker_split_copy_test."test !/ \n _""dist_123_table_81070016"(id i -- End: Switch to Worker1, Create target shards in worker for local 2-way split copy. -- BEGIN: List row count for source shard and targets shard in Worker1. \c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070000"; + count +--------------------------------------------------------------------- + 510 +(1 row) + SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070015"; count --------------------------------------------------------------------- @@ -32,6 +38,13 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 0 (1 row) +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070001"; + count +--------------------------------------------------------------------- + 490 +(1 row) + -- END: List row count for source shard and targets shard in Worker1. -- BEGIN: Set worker_1_node and worker_2_node \c - - - :worker_1_port @@ -47,12 +60,12 @@ SELECT * from worker_split_copy( -- split copy info for split children 1 ROW(81070015, -- destination shard id -2147483648, -- split range begin - 1073741823, --split range end + -1073741824, --split range end :worker_1_node)::citus.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id - 1073741824, --split range begin - 2147483647, --split range end + -1073741823, --split range begin + -1, --split range end :worker_1_node)::citus.split_copy_info ] ); @@ -66,13 +79,13 @@ SELECT * from worker_split_copy( SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070015"; count --------------------------------------------------------------------- - 72 + 247 (1 row) SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016"; count --------------------------------------------------------------------- - 28 + 263 (1 row) -- END: List updated row count for local targets shard. diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index 58e4d3fc5..8d618c3c5 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -60,11 +60,15 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; + +SELECT COUNT(*) FROM sensors; +SELECT COUNT(*) FROM reference_table; +SELECT COUNT(*) FROM colocated_dist_table; -- END: Load data into tables. -- BEGIN : Display current state. -- TODO(niupre): Can we refactor this to be a function? -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid @@ -145,7 +149,7 @@ SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localho -- BEGIN : Display current state. -- TODO(niupre): Can we refactor this to be a function? -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid @@ -210,7 +214,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'blocking'); SET search_path TO "citus_split_test_schema"; -SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport, placementid +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid INNER JOIN pg_dist_node node ON placement.groupid = node.groupid @@ -219,6 +223,12 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node ORDER BY logicalrelid, shardminvalue::BIGINT; -- END: Split second time on another schema +-- BEGIN: Validate Data Count +SELECT COUNT(*) FROM sensors; +SELECT COUNT(*) FROM reference_table; +SELECT COUNT(*) FROM colocated_dist_table; +-- END: Validate Data Count + --BEGIN : Cleanup \c - postgres - :master_port DROP SCHEMA "citus_split_test_schema" CASCADE; diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 5ac765781..324cc87c2 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -1,6 +1,6 @@ CREATE SCHEMA worker_split_copy_test; SET search_path TO worker_split_copy_test; -SET citus.shard_count TO 1; +SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 81070000; @@ -9,7 +9,7 @@ SET citus.next_shard_id TO 81070000; CREATE TABLE worker_split_copy_test."test !/ \n _""dist_123_table"(id int primary key, value char); SELECT create_distributed_table('"test !/ \n _""dist_123_table"', 'id'); -INSERT INTO "test !/ \n _""dist_123_table" (id, value) (SELECT g.id, 'N' FROM generate_series(1, 100) AS g(id)); +INSERT INTO "test !/ \n _""dist_123_table" (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); -- END: Create distributed table and insert data. @@ -21,8 +21,12 @@ CREATE TABLE worker_split_copy_test."test !/ \n _""dist_123_table_81070016"(id i -- BEGIN: List row count for source shard and targets shard in Worker1. \c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070000"; SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070015"; SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016"; + +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070001"; -- END: List row count for source shard and targets shard in Worker1. -- BEGIN: Set worker_1_node and worker_2_node @@ -40,12 +44,12 @@ SELECT * from worker_split_copy( -- split copy info for split children 1 ROW(81070015, -- destination shard id -2147483648, -- split range begin - 1073741823, --split range end + -1073741824, --split range end :worker_1_node)::citus.split_copy_info, -- split copy info for split children 2 ROW(81070016, --destination shard id - 1073741824, --split range begin - 2147483647, --split range end + -1073741823, --split range begin + -1, --split range end :worker_1_node)::citus.split_copy_info ] );