diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index e5ea30f03..b6f146faf 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -415,17 +415,18 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL /* alias */, false /* inh */, false /* inFromCl */); + + List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; CopyFromState cstate = BeginCopyFrom_compat(pState, shard, NULL /* whereClause */, NULL /* fileName */, false /* is_program */, ReadFromLocalBufferCallback, NULL /* attlist (NULL is all columns) */, - list_make1(binaryFormatOption)); - resetStringInfo(localCopyOutState->fe_msgbuf); - + options); CopyFrom(cstate); EndCopyFrom(cstate); + resetStringInfo(localCopyOutState->fe_msgbuf); table_close(shard, NoLock); free_parsestate(pState); diff --git a/src/test/regress/expected/worker_split_binary_copy_test.out b/src/test/regress/expected/worker_split_binary_copy_test.out index 429a76b1d..a0dc3424b 100644 --- a/src/test/regress/expected/worker_split_binary_copy_test.out +++ b/src/test/regress/expected/worker_split_binary_copy_test.out @@ -11,7 +11,7 @@ SELECT create_distributed_table('shard_to_split_copy','id'); (1 row) -INSERT INTO worker_split_binary_copy_test.shard_to_split_copy (id, value) (SELECT g.id, 'c' FROM generate_series(1, 100) AS g(id)); +INSERT INTO worker_split_binary_copy_test.shard_to_split_copy (id, value) (SELECT g.id, 'c' FROM generate_series(1, 1000) AS g(id)); -- END: Create distributed table and insert data. -- BEGIN: Switch to Worker1, Create target shards in worker for local 2-way split copy. \c - - - :worker_1_port @@ -28,7 +28,7 @@ CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy_81060016 (id bigs SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060000; count --------------------------------------------------------------------- - 100 + 1000 (1 row) SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060015; @@ -112,13 +112,13 @@ SELECT * from worker_split_copy( SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060015; count --------------------------------------------------------------------- - 72 + 748 (1 row) SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060016; count --------------------------------------------------------------------- - 28 + 252 (1 row) -- END: List updated row count for local targets shard. @@ -127,13 +127,13 @@ SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060016; SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060015; count --------------------------------------------------------------------- - 72 + 748 (1 row) SELECT COUNT(*) FROM worker_split_binary_copy_test.shard_to_split_copy_81060016; count --------------------------------------------------------------------- - 28 + 252 (1 row) -- END: List updated row count for remote targets shard. diff --git a/src/test/regress/expected/worker_split_text_copy_test.out b/src/test/regress/expected/worker_split_text_copy_test.out new file mode 100644 index 000000000..a46acab10 --- /dev/null +++ b/src/test/regress/expected/worker_split_text_copy_test.out @@ -0,0 +1,147 @@ +CREATE SCHEMA worker_split_text_copy_test; +SET search_path TO worker_split_text_copy_test; +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81070000; +-- BEGIN: Create distributed table and insert data. +-- Add a complex type to table that will force text / non-binary copy. +CREATE TYPE worker_split_text_copy_test.complex_type_to_force_text_copy AS (value char, numval int); +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +SELECT create_distributed_table('shard_to_split_copy','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO worker_split_text_copy_test.shard_to_split_copy (id, complexvalue) (SELECT g.id, ROW('c', g.id)::worker_split_text_copy_test.complex_type_to_force_text_copy FROM generate_series(1, 1000) AS g(id)); +-- END: Create distributed table and insert data. +-- BEGIN: Switch to Worker1, Create target shards in worker for local 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070015 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070016 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +-- End: Switch to Worker1, Create target shards in worker for local 2-way split copy. +-- BEGIN: Switch to Worker2, Create target shards in worker for remote 2-way split copy. +\c - - - :worker_2_port +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070015 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070016 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +-- End: Switch to Worker2, Create target shards in worker for remote 2-way split copy. +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070000; + count +--------------------------------------------------------------------- + 1000 +(1 row) + +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- END: List row count for source shard and targets shard in Worker1. +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- END: List row count for targets shard in Worker2. +-- BEGIN: Set worker_1_node and worker_2_node +\c - - - :worker_1_port +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END: Set worker_1_node and worker_2_node +-- BEGIN: Trigger 2-way local shard split copy. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + 1073741823, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + 1073741824, --split range begin + 2147483647, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); + worker_split_copy +--------------------------------------------------------------------- + +(1 row) + +-- END: Trigger 2-way local shard split copy. +-- BEGIN: Trigger 2-way remote shard split copy. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + 1073741823, --split range end + :worker_2_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + 1073741824, --split range begin + 2147483647, --split range end + :worker_2_node)::citus.split_copy_info + ] + ); + worker_split_copy +--------------------------------------------------------------------- + +(1 row) + +-- END: Trigger 2-way remote shard split copy. +-- BEGIN: List updated row count for local targets shard. +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; + count +--------------------------------------------------------------------- + 748 +(1 row) + +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; + count +--------------------------------------------------------------------- + 252 +(1 row) + +-- END: List updated row count for local targets shard. +-- BEGIN: List updated row count for remote targets shard. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; + count +--------------------------------------------------------------------- + 748 +(1 row) + +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; + count +--------------------------------------------------------------------- + 252 +(1 row) + +-- END: List updated row count for remote targets shard. +-- BEGIN: CLEANUP. +\c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA citus_split_shard_by_split_points_local CASCADE; +ERROR: schema "citus_split_shard_by_split_points_local" does not exist +-- END: CLEANUP. diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 1dd462aef..344b805b9 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -6,3 +6,4 @@ test: multi_test_catalog_views test: tablespace # Split tests go here. test: worker_split_binary_copy_test +test: worker_split_text_copy_test diff --git a/src/test/regress/sql/worker_split_binary_copy_test.sql b/src/test/regress/sql/worker_split_binary_copy_test.sql index 8e5416bb3..7782dac64 100644 --- a/src/test/regress/sql/worker_split_binary_copy_test.sql +++ b/src/test/regress/sql/worker_split_binary_copy_test.sql @@ -7,7 +7,7 @@ SET citus.next_shard_id TO 81060000; -- BEGIN: Create distributed table and insert data. CREATE TABLE worker_split_binary_copy_test.shard_to_split_copy (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('shard_to_split_copy','id'); -INSERT INTO worker_split_binary_copy_test.shard_to_split_copy (id, value) (SELECT g.id, 'c' FROM generate_series(1, 100) AS g(id)); +INSERT INTO worker_split_binary_copy_test.shard_to_split_copy (id, value) (SELECT g.id, 'c' FROM generate_series(1, 1000) AS g(id)); -- END: Create distributed table and insert data. -- BEGIN: Switch to Worker1, Create target shards in worker for local 2-way split copy. diff --git a/src/test/regress/sql/worker_split_text_copy_test.sql b/src/test/regress/sql/worker_split_text_copy_test.sql new file mode 100644 index 000000000..0936c0e41 --- /dev/null +++ b/src/test/regress/sql/worker_split_text_copy_test.sql @@ -0,0 +1,99 @@ +CREATE SCHEMA worker_split_text_copy_test; +SET search_path TO worker_split_text_copy_test; +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81070000; + +-- BEGIN: Create distributed table and insert data. +-- Add a complex type to table that will force text / non-binary copy. +CREATE TYPE worker_split_text_copy_test.complex_type_to_force_text_copy AS (value char, numval int); + +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +SELECT create_distributed_table('shard_to_split_copy','id'); + +INSERT INTO worker_split_text_copy_test.shard_to_split_copy (id, complexvalue) (SELECT g.id, ROW('c', g.id)::worker_split_text_copy_test.complex_type_to_force_text_copy FROM generate_series(1, 1000) AS g(id)); +-- END: Create distributed table and insert data. + +-- BEGIN: Switch to Worker1, Create target shards in worker for local 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070015 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070016 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +-- End: Switch to Worker1, Create target shards in worker for local 2-way split copy. + +-- BEGIN: Switch to Worker2, Create target shards in worker for remote 2-way split copy. +\c - - - :worker_2_port +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070015 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +CREATE TABLE worker_split_text_copy_test.shard_to_split_copy_81070016 (id bigserial PRIMARY KEY, complexvalue worker_split_text_copy_test.complex_type_to_force_text_copy); +-- End: Switch to Worker2, Create target shards in worker for remote 2-way split copy. + +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070000; +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; +-- END: List row count for source shard and targets shard in Worker1. + +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; +-- END: List row count for targets shard in Worker2. + +-- BEGIN: Set worker_1_node and worker_2_node +\c - - - :worker_1_port +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END: Set worker_1_node and worker_2_node + +-- BEGIN: Trigger 2-way local shard split copy. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + 1073741823, --split range end + :worker_1_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + 1073741824, --split range begin + 2147483647, --split range end + :worker_1_node)::citus.split_copy_info + ] + ); +-- END: Trigger 2-way local shard split copy. + +-- BEGIN: Trigger 2-way remote shard split copy. +SELECT * from worker_split_copy( + 81070000, -- source shard id to copy + ARRAY[ + -- split copy info for split children 1 + ROW(81070015, -- destination shard id + -2147483648, -- split range begin + 1073741823, --split range end + :worker_2_node)::citus.split_copy_info, + -- split copy info for split children 2 + ROW(81070016, --destination shard id + 1073741824, --split range begin + 2147483647, --split range end + :worker_2_node)::citus.split_copy_info + ] + ); +-- END: Trigger 2-way remote shard split copy. + +-- BEGIN: List updated row count for local targets shard. +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; +-- END: List updated row count for local targets shard. + +-- BEGIN: List updated row count for remote targets shard. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070015; +SELECT COUNT(*) FROM worker_split_text_copy_test.shard_to_split_copy_81070016; +-- END: List updated row count for remote targets shard. + +-- BEGIN: CLEANUP. +\c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA citus_split_shard_by_split_points_local CASCADE; +-- END: CLEANUP.