mirror of https://github.com/citusdata/citus.git
Exclude-Generated-Columns-In-Copy (#6721)
DESCRIPTION: Fixes a bug in shard copy operations.
For copying shards in both shard move and shard split operations, Citus
uses the COPY statement.
A COPY all statement in the following form
` COPY target_shard FROM STDIN;`
throws an error when there is a GENERATED column in the shard table.
In order to fix this issue, we need to exclude the GENERATED columns in
the COPY and the matching SELECT statements. Hence this fix converts the
COPY and SELECT all statements to the following form:
```
COPY target_shard (col1, col2, ..., coln) FROM STDIN;
SELECT (col1, col2, ..., coln) FROM source_shard;
```
where (col1, col2, ..., coln) does not include a GENERATED column.
GENERATED column values are created in the target_shard as the values
are inserted.
Fixes #6705.
---------
Co-authored-by: Teja Mupparti <temuppar@microsoft.com>
Co-authored-by: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com>
Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>
Co-authored-by: Gürkan İndibay <gindibay@microsoft.com>
(cherry picked from commit 4043abd5aa
)
release-11.1-emel
parent
20141223b6
commit
dec4a9c012
|
@ -53,8 +53,14 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS)
|
|||
targetNodeId);
|
||||
|
||||
StringInfo selectShardQueryForCopy = makeStringInfo();
|
||||
|
||||
/*
|
||||
* Even though we do COPY(SELECT ...) all the columns, we can't just do SELECT * because we need to not COPY generated colums.
|
||||
*/
|
||||
const char *columnList = CopyableColumnNamesFromRelationName(relationSchemaName,
|
||||
relationName);
|
||||
appendStringInfo(selectShardQueryForCopy,
|
||||
"SELECT * FROM %s;", relationQualifiedName);
|
||||
"SELECT %s FROM %s;", columnList, relationQualifiedName);
|
||||
|
||||
ParamListInfo params = NULL;
|
||||
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
|
||||
|
|
|
@ -73,7 +73,7 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
|
|||
static bool CanUseLocalCopy(uint32_t destinationNodeId);
|
||||
static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName,
|
||||
bool
|
||||
useBinaryFormat);
|
||||
useBinaryFormat, TupleDesc tupleDesc);
|
||||
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
|
||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
|
||||
|
@ -105,7 +105,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
|
|||
|
||||
StringInfo copyStatement = ConstructShardCopyStatement(
|
||||
copyDest->destinationShardFullyQualifiedName,
|
||||
copyDest->copyOutState->binary);
|
||||
copyDest->copyOutState->binary,
|
||||
copyDest->tupleDescriptor);
|
||||
|
||||
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
|
||||
{
|
||||
|
@ -344,21 +345,80 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyableColumnNamesFromTupleDesc function creates and returns a comma seperated column names string to be used in COPY
|
||||
* and SELECT statements when copying a table. The COPY and SELECT statements should filter out the GENERATED columns since COPY
|
||||
* statement fails to handle them. Iterating over the attributes of the table we also need to skip the dropped columns.
|
||||
*/
|
||||
const char *
|
||||
CopyableColumnNamesFromTupleDesc(TupleDesc tupDesc)
|
||||
{
|
||||
StringInfo columnList = makeStringInfo();
|
||||
bool firstInList = true;
|
||||
|
||||
for (int i = 0; i < tupDesc->natts; i++)
|
||||
{
|
||||
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
|
||||
if (att->attgenerated || att->attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (!firstInList)
|
||||
{
|
||||
appendStringInfo(columnList, ",");
|
||||
}
|
||||
|
||||
firstInList = false;
|
||||
|
||||
appendStringInfo(columnList, "%s", quote_identifier(NameStr(att->attname)));
|
||||
}
|
||||
|
||||
return columnList->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyableColumnNamesFromRelationName function is a wrapper for CopyableColumnNamesFromTupleDesc.
|
||||
*/
|
||||
const char *
|
||||
CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName)
|
||||
{
|
||||
Oid namespaceOid = get_namespace_oid(schemaName, true);
|
||||
|
||||
Oid relationId = get_relname_relid(relationName, namespaceOid);
|
||||
|
||||
Relation relation = relation_open(relationId, AccessShareLock);
|
||||
|
||||
TupleDesc tupleDesc = RelationGetDescr(relation);
|
||||
|
||||
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
return columnList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConstructShardCopyStatement constructs the text of a COPY statement
|
||||
* for copying into a result table
|
||||
*/
|
||||
static StringInfo
|
||||
ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
|
||||
useBinaryFormat)
|
||||
useBinaryFormat,
|
||||
TupleDesc tupleDesc)
|
||||
{
|
||||
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
|
||||
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
|
||||
|
||||
|
||||
StringInfo command = makeStringInfo();
|
||||
appendStringInfo(command, "COPY %s.%s FROM STDIN",
|
||||
|
||||
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
|
||||
|
||||
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
|
||||
quote_identifier(destinationShardSchemaName), quote_identifier(
|
||||
destinationShardRelationName));
|
||||
destinationShardRelationName), columnList);
|
||||
|
||||
if (useBinaryFormat)
|
||||
{
|
||||
|
|
|
@ -110,8 +110,13 @@ worker_split_copy(PG_FUNCTION_ARGS)
|
|||
splitCopyInfoList))));
|
||||
|
||||
StringInfo selectShardQueryForCopy = makeStringInfo();
|
||||
const char *columnList = CopyableColumnNamesFromRelationName(
|
||||
sourceShardToCopySchemaName,
|
||||
sourceShardToCopyName);
|
||||
|
||||
appendStringInfo(selectShardQueryForCopy,
|
||||
"SELECT * FROM %s;", sourceShardToCopyQualifiedName);
|
||||
"SELECT %s FROM %s;", columnList,
|
||||
sourceShardToCopyQualifiedName);
|
||||
|
||||
ParamListInfo params = NULL;
|
||||
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
|
||||
|
|
|
@ -19,4 +19,9 @@ extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
|
|||
List *destinationShardFullyQualifiedName,
|
||||
uint32_t destinationNodeId);
|
||||
|
||||
extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const
|
||||
char *relationName);
|
||||
|
||||
extern const char * CopyableColumnNamesFromTupleDesc(TupleDesc tupdesc);
|
||||
|
||||
#endif /* WORKER_SHARD_COPY_H_ */
|
||||
|
|
|
@ -60,7 +60,7 @@ SELECT create_reference_table('reference_table');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||
create_distributed_table
|
||||
|
@ -84,8 +84,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
|
|||
-- END : Create Foreign key constraints.
|
||||
-- BEGIN : Load data into tables.
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
|
||||
SELECT COUNT(*) FROM sensors;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -56,7 +56,7 @@ SELECT create_reference_table('reference_table');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||
create_distributed_table
|
||||
|
@ -80,8 +80,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
|
|||
-- END : Create Foreign key constraints.
|
||||
-- BEGIN : Load data into tables.
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
|
||||
SELECT COUNT(*) FROM sensors;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -238,8 +238,40 @@ ORDER BY
|
|||
LIMIT 1 OFFSET 1;
|
||||
ERROR: operation is not allowed on this node
|
||||
HINT: Connect to the coordinator and run it again.
|
||||
-- Check that shards of a table with GENERATED columns can be moved.
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
|
||||
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Check that dropped columns are handled properly in a move.
|
||||
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
|
||||
-- Move a shard from worker 1 to worker 2
|
||||
SELECT
|
||||
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
|
||||
FROM
|
||||
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
||||
WHERE
|
||||
logicalrelid = 'mx_table_with_generated_column'::regclass
|
||||
AND nodeport = :worker_1_port
|
||||
ORDER BY
|
||||
shardid
|
||||
LIMIT 1;
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Cleanup
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
DROP TABLE mx_table_with_generated_column;
|
||||
DROP TABLE mx_table_1;
|
||||
DROP TABLE mx_table_2;
|
||||
DROP TABLE mx_table_3;
|
||||
|
|
|
@ -142,8 +142,90 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
|
|||
(1 row)
|
||||
|
||||
-- END: List updated row count for local targets shard.
|
||||
-- Check that GENERATED columns are handled properly in a shard split operation.
|
||||
\c - - - :master_port
|
||||
SET search_path TO worker_split_copy_test;
|
||||
SET citus.shard_count TO 2;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.next_shard_id TO 81080000;
|
||||
-- BEGIN: Create distributed table and insert data.
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
|
||||
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Check that dropped columns are filtered out in COPY command.
|
||||
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
|
||||
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
|
||||
-- END: Create distributed table and insert data.
|
||||
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
|
||||
\c - - - :worker_1_port
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
|
||||
\c - - - :worker_2_port
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
|
||||
-- BEGIN: List row count for source shard and targets shard in Worker1.
|
||||
\c - - - :worker_1_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
510
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- BEGIN: List row count for target shard in Worker2.
|
||||
\c - - - :worker_2_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT * from worker_split_copy(
|
||||
81080000, -- source shard id to copy
|
||||
'id',
|
||||
ARRAY[
|
||||
-- split copy info for split children 1
|
||||
ROW(81080015, -- destination shard id
|
||||
-2147483648, -- split range begin
|
||||
-1073741824, --split range end
|
||||
:worker_1_node)::pg_catalog.split_copy_info,
|
||||
-- split copy info for split children 2
|
||||
ROW(81080016, --destination shard id
|
||||
-1073741823, --split range begin
|
||||
-1, --split range end
|
||||
:worker_2_node)::pg_catalog.split_copy_info
|
||||
]
|
||||
);
|
||||
worker_split_copy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
247
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
263
|
||||
(1 row)
|
||||
|
||||
-- BEGIN: CLEANUP.
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
DROP SCHEMA worker_split_copy_test CASCADE;
|
||||
-- END: CLEANUP.
|
||||
|
|
|
@ -53,7 +53,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
|
|||
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||
|
||||
|
@ -70,9 +70,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
|
|||
|
||||
-- BEGIN : Load data into tables.
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||
|
||||
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
|
||||
|
||||
SELECT COUNT(*) FROM sensors;
|
||||
SELECT COUNT(*) FROM reference_table;
|
||||
SELECT COUNT(*) FROM colocated_dist_table;
|
||||
|
|
|
@ -49,7 +49,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
|
|||
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
|
||||
|
||||
|
@ -66,9 +66,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
|
|||
|
||||
-- BEGIN : Load data into tables.
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
|
||||
|
||||
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
|
||||
|
||||
SELECT COUNT(*) FROM sensors;
|
||||
SELECT COUNT(*) FROM reference_table;
|
||||
SELECT COUNT(*) FROM colocated_dist_table;
|
||||
|
|
|
@ -151,8 +151,34 @@ ORDER BY
|
|||
shardid
|
||||
LIMIT 1 OFFSET 1;
|
||||
|
||||
-- Check that shards of a table with GENERATED columns can be moved.
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
|
||||
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
|
||||
|
||||
-- Check that dropped columns are handled properly in a move.
|
||||
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
|
||||
|
||||
-- Move a shard from worker 1 to worker 2
|
||||
SELECT
|
||||
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
|
||||
FROM
|
||||
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
||||
WHERE
|
||||
logicalrelid = 'mx_table_with_generated_column'::regclass
|
||||
AND nodeport = :worker_1_port
|
||||
ORDER BY
|
||||
shardid
|
||||
LIMIT 1;
|
||||
|
||||
-- Cleanup
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
DROP TABLE mx_table_with_generated_column;
|
||||
DROP TABLE mx_table_1;
|
||||
DROP TABLE mx_table_2;
|
||||
DROP TABLE mx_table_3;
|
||||
|
|
|
@ -110,8 +110,66 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
|
|||
SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016";
|
||||
-- END: List updated row count for local targets shard.
|
||||
|
||||
-- Check that GENERATED columns are handled properly in a shard split operation.
|
||||
\c - - - :master_port
|
||||
SET search_path TO worker_split_copy_test;
|
||||
SET citus.shard_count TO 2;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.next_shard_id TO 81080000;
|
||||
|
||||
-- BEGIN: Create distributed table and insert data.
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
|
||||
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
|
||||
|
||||
-- Check that dropped columns are filtered out in COPY command.
|
||||
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
|
||||
|
||||
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
|
||||
|
||||
-- END: Create distributed table and insert data.
|
||||
|
||||
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
|
||||
\c - - - :worker_1_port
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
|
||||
\c - - - :worker_2_port
|
||||
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
|
||||
|
||||
-- BEGIN: List row count for source shard and targets shard in Worker1.
|
||||
\c - - - :worker_1_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
|
||||
|
||||
-- BEGIN: List row count for target shard in Worker2.
|
||||
\c - - - :worker_2_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT * from worker_split_copy(
|
||||
81080000, -- source shard id to copy
|
||||
'id',
|
||||
ARRAY[
|
||||
-- split copy info for split children 1
|
||||
ROW(81080015, -- destination shard id
|
||||
-2147483648, -- split range begin
|
||||
-1073741824, --split range end
|
||||
:worker_1_node)::pg_catalog.split_copy_info,
|
||||
-- split copy info for split children 2
|
||||
ROW(81080016, --destination shard id
|
||||
-1073741823, --split range begin
|
||||
-1, --split range end
|
||||
:worker_2_node)::pg_catalog.split_copy_info
|
||||
]
|
||||
);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
|
||||
|
||||
-- BEGIN: CLEANUP.
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
CALL citus_cleanup_orphaned_resources();
|
||||
DROP SCHEMA worker_split_copy_test CASCADE;
|
||||
-- END: CLEANUP.
|
||||
|
|
Loading…
Reference in New Issue