Exclude-Generated-Columns-In-Copy (#6721)

DESCRIPTION: Fixes a bug in shard copy operations.

For copying shards in both shard move and shard split operations, Citus
uses the COPY statement.

A COPY all statement in the following form
` COPY target_shard FROM STDIN;`
throws an error when there is a GENERATED column in the shard table.

In order to fix this issue, we need to exclude the GENERATED columns in
the COPY and the matching SELECT statements. Hence this fix converts the
COPY and SELECT all statements to the following form:
```
COPY target_shard (col1, col2, ..., coln) FROM STDIN;
SELECT (col1, col2, ..., coln) FROM source_shard;
```
where (col1, col2, ..., coln) does not include a GENERATED column. 
GENERATED column values are created in the target_shard as the values
are inserted.

Fixes #6705.

---------

Co-authored-by: Teja Mupparti <temuppar@microsoft.com>
Co-authored-by: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com>
Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>
Co-authored-by: Gürkan İndibay <gindibay@microsoft.com>
pull/6742/head^2
Emel Şimşek 2023-03-07 18:15:50 +03:00 committed by GitHub
parent 03f1bb70b7
commit 4043abd5aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 295 additions and 15 deletions

View File

@ -53,8 +53,14 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS)
targetNodeId);
StringInfo selectShardQueryForCopy = makeStringInfo();
/*
* Even though we do COPY(SELECT ...) all the columns, we can't just do SELECT * because we need to not COPY generated colums.
*/
const char *columnList = CopyableColumnNamesFromRelationName(relationSchemaName,
relationName);
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", relationQualifiedName);
"SELECT %s FROM %s;", columnList, relationQualifiedName);
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,

View File

@ -73,7 +73,7 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint32_t destinationNodeId);
static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName,
bool
useBinaryFormat);
useBinaryFormat, TupleDesc tupleDesc);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
@ -105,7 +105,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
@ -344,21 +345,80 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
}
/*
* CopyableColumnNamesFromTupleDesc function creates and returns a comma seperated column names string to be used in COPY
* and SELECT statements when copying a table. The COPY and SELECT statements should filter out the GENERATED columns since COPY
* statement fails to handle them. Iterating over the attributes of the table we also need to skip the dropped columns.
*/
const char *
CopyableColumnNamesFromTupleDesc(TupleDesc tupDesc)
{
StringInfo columnList = makeStringInfo();
bool firstInList = true;
for (int i = 0; i < tupDesc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
if (att->attgenerated || att->attisdropped)
{
continue;
}
if (!firstInList)
{
appendStringInfo(columnList, ",");
}
firstInList = false;
appendStringInfo(columnList, "%s", quote_identifier(NameStr(att->attname)));
}
return columnList->data;
}
/*
* CopyableColumnNamesFromRelationName function is a wrapper for CopyableColumnNamesFromTupleDesc.
*/
const char *
CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName)
{
Oid namespaceOid = get_namespace_oid(schemaName, true);
Oid relationId = get_relname_relid(relationName, namespaceOid);
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDesc = RelationGetDescr(relation);
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
relation_close(relation, NoLock);
return columnList;
}
/*
* ConstructShardCopyStatement constructs the text of a COPY statement
* for copying into a result table
*/
static StringInfo
ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat)
useBinaryFormat,
TupleDesc tupleDesc)
{
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY %s.%s FROM STDIN",
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
quote_identifier(destinationShardSchemaName), quote_identifier(
destinationShardRelationName));
destinationShardRelationName), columnList);
if (useBinaryFormat)
{

View File

@ -110,8 +110,13 @@ worker_split_copy(PG_FUNCTION_ARGS)
splitCopyInfoList))));
StringInfo selectShardQueryForCopy = makeStringInfo();
const char *columnList = CopyableColumnNamesFromRelationName(
sourceShardToCopySchemaName,
sourceShardToCopyName);
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", sourceShardToCopyQualifiedName);
"SELECT %s FROM %s;", columnList,
sourceShardToCopyQualifiedName);
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,

View File

@ -19,4 +19,9 @@ extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
List *destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const
char *relationName);
extern const char * CopyableColumnNamesFromTupleDesc(TupleDesc tupdesc);
#endif /* WORKER_SHARD_COPY_H_ */

View File

@ -60,7 +60,7 @@ SELECT create_reference_table('reference_table');
(1 row)
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
create_distributed_table
@ -84,8 +84,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------

View File

@ -56,7 +56,7 @@ SELECT create_reference_table('reference_table');
(1 row)
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
create_distributed_table
@ -80,8 +80,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------

View File

@ -238,8 +238,40 @@ ORDER BY
LIMIT 1 OFFSET 1;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
-- Check that shards of a table with GENERATED columns can be moved.
\c - - - :master_port
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Check that dropped columns are handled properly in a move.
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
-- Move a shard from worker 1 to worker 2
SELECT
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_table_with_generated_column'::regclass
AND nodeport = :worker_1_port
ORDER BY
shardid
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- Cleanup
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP TABLE mx_table_with_generated_column;
DROP TABLE mx_table_1;
DROP TABLE mx_table_2;
DROP TABLE mx_table_3;

View File

@ -142,8 +142,90 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
(1 row)
-- END: List updated row count for local targets shard.
-- Check that GENERATED columns are handled properly in a shard split operation.
\c - - - :master_port
SET search_path TO worker_split_copy_test;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 81080000;
-- BEGIN: Create distributed table and insert data.
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Check that dropped columns are filtered out in COPY command.
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
-- END: Create distributed table and insert data.
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
\c - - - :worker_1_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
\c - - - :worker_2_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
-- BEGIN: List row count for source shard and targets shard in Worker1.
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
count
---------------------------------------------------------------------
510
(1 row)
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
count
---------------------------------------------------------------------
0
(1 row)
-- BEGIN: List row count for target shard in Worker2.
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_1_port
SELECT * from worker_split_copy(
81080000, -- source shard id to copy
'id',
ARRAY[
-- split copy info for split children 1
ROW(81080015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::pg_catalog.split_copy_info,
-- split copy info for split children 2
ROW(81080016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_2_node)::pg_catalog.split_copy_info
]
);
worker_split_copy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
count
---------------------------------------------------------------------
247
(1 row)
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
count
---------------------------------------------------------------------
263
(1 row)
-- BEGIN: CLEANUP.
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP SCHEMA worker_split_copy_test CASCADE;
-- END: CLEANUP.

View File

@ -53,7 +53,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
@ -70,9 +70,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;

View File

@ -49,7 +49,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
@ -66,9 +66,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;

View File

@ -151,8 +151,34 @@ ORDER BY
shardid
LIMIT 1 OFFSET 1;
-- Check that shards of a table with GENERATED columns can be moved.
\c - - - :master_port
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
-- Check that dropped columns are handled properly in a move.
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
-- Move a shard from worker 1 to worker 2
SELECT
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_table_with_generated_column'::regclass
AND nodeport = :worker_1_port
ORDER BY
shardid
LIMIT 1;
-- Cleanup
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP TABLE mx_table_with_generated_column;
DROP TABLE mx_table_1;
DROP TABLE mx_table_2;
DROP TABLE mx_table_3;

View File

@ -110,8 +110,66 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016";
-- END: List updated row count for local targets shard.
-- Check that GENERATED columns are handled properly in a shard split operation.
\c - - - :master_port
SET search_path TO worker_split_copy_test;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 81080000;
-- BEGIN: Create distributed table and insert data.
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
-- Check that dropped columns are filtered out in COPY command.
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
-- END: Create distributed table and insert data.
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
\c - - - :worker_1_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
\c - - - :worker_2_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
-- BEGIN: List row count for source shard and targets shard in Worker1.
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
-- BEGIN: List row count for target shard in Worker2.
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
\c - - - :worker_1_port
SELECT * from worker_split_copy(
81080000, -- source shard id to copy
'id',
ARRAY[
-- split copy info for split children 1
ROW(81080015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::pg_catalog.split_copy_info,
-- split copy info for split children 2
ROW(81080016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_2_node)::pg_catalog.split_copy_info
]
);
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
-- BEGIN: CLEANUP.
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP SCHEMA worker_split_copy_test CASCADE;
-- END: CLEANUP.