diff --git a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c index 7af80ef55..f0f83744d 100644 --- a/src/backend/distributed/operations/worker_copy_table_to_node_udf.c +++ b/src/backend/distributed/operations/worker_copy_table_to_node_udf.c @@ -53,8 +53,14 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS) targetNodeId); StringInfo selectShardQueryForCopy = makeStringInfo(); + + /* + * Even though we do COPY(SELECT ...) all the columns, we can't just do SELECT * because we need to not COPY generated colums. + */ + const char *columnList = CopyableColumnNamesFromRelationName(relationSchemaName, + relationName); appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", relationQualifiedName); + "SELECT %s FROM %s;", columnList, relationQualifiedName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 9239caffb..e9c2af512 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -73,7 +73,7 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint32_t destinationNodeId); static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat); + useBinaryFormat, TupleDesc tupleDesc); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState @@ -105,7 +105,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary); + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) { @@ -344,21 +345,80 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) } +/* + * CopyableColumnNamesFromTupleDesc function creates and returns a comma seperated column names string to be used in COPY + * and SELECT statements when copying a table. The COPY and SELECT statements should filter out the GENERATED columns since COPY + * statement fails to handle them. Iterating over the attributes of the table we also need to skip the dropped columns. + */ +const char * +CopyableColumnNamesFromTupleDesc(TupleDesc tupDesc) +{ + StringInfo columnList = makeStringInfo(); + bool firstInList = true; + + for (int i = 0; i < tupDesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, i); + if (att->attgenerated || att->attisdropped) + { + continue; + } + if (!firstInList) + { + appendStringInfo(columnList, ","); + } + + firstInList = false; + + appendStringInfo(columnList, "%s", quote_identifier(NameStr(att->attname))); + } + + return columnList->data; +} + + +/* + * CopyableColumnNamesFromRelationName function is a wrapper for CopyableColumnNamesFromTupleDesc. + */ +const char * +CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName) +{ + Oid namespaceOid = get_namespace_oid(schemaName, true); + + Oid relationId = get_relname_relid(relationName, namespaceOid); + + Relation relation = relation_open(relationId, AccessShareLock); + + TupleDesc tupleDesc = RelationGetDescr(relation); + + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + + relation_close(relation, NoLock); + + return columnList; +} + + /* * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat) + useBinaryFormat, + TupleDesc tupleDesc) { char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + StringInfo command = makeStringInfo(); - appendStringInfo(command, "COPY %s.%s FROM STDIN", + + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", quote_identifier(destinationShardSchemaName), quote_identifier( - destinationShardRelationName)); + destinationShardRelationName), columnList); if (useBinaryFormat) { diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index b96475992..c154ac040 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -110,8 +110,13 @@ worker_split_copy(PG_FUNCTION_ARGS) splitCopyInfoList)))); StringInfo selectShardQueryForCopy = makeStringInfo(); + const char *columnList = CopyableColumnNamesFromRelationName( + sourceShardToCopySchemaName, + sourceShardToCopyName); + appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", sourceShardToCopyQualifiedName); + "SELECT %s FROM %s;", columnList, + sourceShardToCopyQualifiedName); ParamListInfo params = NULL; ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index 2ab2775f9..77f57c761 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -19,4 +19,9 @@ extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState, List *destinationShardFullyQualifiedName, uint32_t destinationNodeId); +extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const + char *relationName); + +extern const char * CopyableColumnNamesFromTupleDesc(TupleDesc tupdesc); + #endif /* WORKER_SHARD_COPY_H_ */ diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index d6dde8b7a..fe3cade55 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -60,7 +60,7 @@ SELECT create_reference_table('reference_table'); (1 row) -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); create_distributed_table @@ -84,8 +84,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- END : Create Foreign key constraints. -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; SELECT COUNT(*) FROM sensors; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 87f50da31..13f3b7a36 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -56,7 +56,7 @@ SELECT create_reference_table('reference_table'); (1 row) -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); create_distributed_table @@ -80,8 +80,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- END : Create Foreign key constraints. -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; SELECT COUNT(*) FROM sensors; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_move_mx.out b/src/test/regress/expected/multi_move_mx.out index 833c9f7df..b6cc5d0d7 100644 --- a/src/test/regress/expected/multi_move_mx.out +++ b/src/test/regress/expected/multi_move_mx.out @@ -238,8 +238,40 @@ ORDER BY LIMIT 1 OFFSET 1; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. +-- Check that shards of a table with GENERATED columns can be moved. +\c - - - :master_port +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int); +SELECT create_distributed_table('mx_table_with_generated_column', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Check that dropped columns are handled properly in a move. +ALTER TABLE mx_table_with_generated_column DROP COLUMN c; +-- Move a shard from worker 1 to worker 2 +SELECT + citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_table_with_generated_column'::regclass + AND nodeport = :worker_1_port +ORDER BY + shardid +LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- Cleanup \c - - - :master_port +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE mx_table_with_generated_column; DROP TABLE mx_table_1; DROP TABLE mx_table_2; DROP TABLE mx_table_3; diff --git a/src/test/regress/expected/worker_split_copy_test.out b/src/test/regress/expected/worker_split_copy_test.out index 67d515198..f4fae57e0 100644 --- a/src/test/regress/expected/worker_split_copy_test.out +++ b/src/test/regress/expected/worker_split_copy_test.out @@ -142,8 +142,90 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 (1 row) -- END: List updated row count for local targets shard. +-- Check that GENERATED columns are handled properly in a shard split operation. +\c - - - :master_port +SET search_path TO worker_split_copy_test; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81080000; +-- BEGIN: Create distributed table and insert data. +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int); +SELECT create_distributed_table('dist_table_with_generated_col', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Check that dropped columns are filtered out in COPY command. +ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop; +INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); +-- END: Create distributed table and insert data. +-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +\c - - - :worker_2_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000; + count +--------------------------------------------------------------------- + 510 +(1 row) + +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +SELECT * from worker_split_copy( + 81080000, -- source shard id to copy + 'id', + ARRAY[ + -- split copy info for split children 1 + ROW(81080015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81080016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_2_node)::pg_catalog.split_copy_info + ] + ); + worker_split_copy +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + count +--------------------------------------------------------------------- + 247 +(1 row) + +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + count +--------------------------------------------------------------------- + 263 +(1 row) + -- BEGIN: CLEANUP. \c - - - :master_port SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); DROP SCHEMA worker_split_copy_test CASCADE; -- END: CLEANUP. diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index 11275a342..909beac02 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -53,7 +53,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); CREATE TABLE reference_table (measureid integer PRIMARY KEY); SELECT create_reference_table('reference_table'); -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); @@ -70,9 +70,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; + SELECT COUNT(*) FROM sensors; SELECT COUNT(*) FROM reference_table; SELECT COUNT(*) FROM colocated_dist_table; diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index f5e7f005a..47b28b9d7 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -49,7 +49,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); CREATE TABLE reference_table (measureid integer PRIMARY KEY); SELECT create_reference_table('reference_table'); -CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer); CLUSTER colocated_dist_table USING colocated_dist_table_pkey; SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); @@ -66,9 +66,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE -- BEGIN : Load data into tables. INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; -INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i; INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +ALTER TABLE colocated_dist_table DROP COLUMN col_todrop; + SELECT COUNT(*) FROM sensors; SELECT COUNT(*) FROM reference_table; SELECT COUNT(*) FROM colocated_dist_table; diff --git a/src/test/regress/sql/multi_move_mx.sql b/src/test/regress/sql/multi_move_mx.sql index 166069a6e..9cfa8a3db 100644 --- a/src/test/regress/sql/multi_move_mx.sql +++ b/src/test/regress/sql/multi_move_mx.sql @@ -151,8 +151,34 @@ ORDER BY shardid LIMIT 1 OFFSET 1; +-- Check that shards of a table with GENERATED columns can be moved. +\c - - - :master_port +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int); +SELECT create_distributed_table('mx_table_with_generated_column', 'a'); + +-- Check that dropped columns are handled properly in a move. +ALTER TABLE mx_table_with_generated_column DROP COLUMN c; + +-- Move a shard from worker 1 to worker 2 +SELECT + citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_table_with_generated_column'::regclass + AND nodeport = :worker_1_port +ORDER BY + shardid +LIMIT 1; + -- Cleanup \c - - - :master_port +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); +DROP TABLE mx_table_with_generated_column; DROP TABLE mx_table_1; DROP TABLE mx_table_2; DROP TABLE mx_table_3; diff --git a/src/test/regress/sql/worker_split_copy_test.sql b/src/test/regress/sql/worker_split_copy_test.sql index 2fac91c69..e2f4f9a23 100644 --- a/src/test/regress/sql/worker_split_copy_test.sql +++ b/src/test/regress/sql/worker_split_copy_test.sql @@ -110,8 +110,66 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700 SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016"; -- END: List updated row count for local targets shard. +-- Check that GENERATED columns are handled properly in a shard split operation. +\c - - - :master_port +SET search_path TO worker_split_copy_test; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 81080000; + +-- BEGIN: Create distributed table and insert data. +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int); +SELECT create_distributed_table('dist_table_with_generated_col', 'id'); + +-- Check that dropped columns are filtered out in COPY command. +ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop; + +INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id)); + +-- END: Create distributed table and insert data. + +-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy. +\c - - - :worker_1_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); +\c - - - :worker_2_port +CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char); + +-- BEGIN: List row count for source shard and targets shard in Worker1. +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000; +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + +-- BEGIN: List row count for target shard in Worker2. +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + +\c - - - :worker_1_port +SELECT * from worker_split_copy( + 81080000, -- source shard id to copy + 'id', + ARRAY[ + -- split copy info for split children 1 + ROW(81080015, -- destination shard id + -2147483648, -- split range begin + -1073741824, --split range end + :worker_1_node)::pg_catalog.split_copy_info, + -- split copy info for split children 2 + ROW(81080016, --destination shard id + -1073741823, --split range begin + -1, --split range end + :worker_2_node)::pg_catalog.split_copy_info + ] + ); + +\c - - - :worker_1_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015; + +\c - - - :worker_2_port +SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016; + -- BEGIN: CLEANUP. \c - - - :master_port SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_resources(); DROP SCHEMA worker_split_copy_test CASCADE; -- END: CLEANUP.