mirror of https://github.com/citusdata/citus.git
Add cascade option to undistribute_table
parent
2e3e680ba9
commit
f3801143fb
|
@ -321,6 +321,11 @@ GetCascadeOperationFunction(CascadeOperationType cascadeOperationType)
|
|||
{
|
||||
switch (cascadeOperationType)
|
||||
{
|
||||
case UNDISTRIBUTE_TABLE:
|
||||
{
|
||||
return UndistributeTable;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/*
|
||||
|
|
|
@ -86,6 +86,11 @@
|
|||
*/
|
||||
#define LOG_PER_TUPLE_AMOUNT 1000000
|
||||
|
||||
#define UNDISTRIBUTE_TABLE_CASCADE_HINT \
|
||||
"Use cascade option to undistribute all the relations involved in " \
|
||||
"a foreign key relationship with %s by executing SELECT " \
|
||||
"undistribute_table($$%s$$, cascade_via_foreign_keys=>true)"
|
||||
|
||||
/* Replication model to use when creating distributed tables */
|
||||
int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
|
||||
|
||||
|
@ -124,7 +129,6 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
|||
DestReceiver *copyDest,
|
||||
TupleTableSlot *slot,
|
||||
EState *estate);
|
||||
static void UndistributeTable(Oid relationId);
|
||||
static List * GetViewCreationCommandsOfTable(Oid relationId);
|
||||
static void ReplaceTable(Oid sourceId, Oid targetId);
|
||||
|
||||
|
@ -289,10 +293,11 @@ Datum
|
|||
undistribute_table(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool cascadeViaForeignKeys = PG_GETARG_BOOL(1);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
UndistributeTable(relationId);
|
||||
UndistributeTable(relationId, cascadeViaForeignKeys);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -1552,7 +1557,7 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
|
|||
* be dropped.
|
||||
*/
|
||||
void
|
||||
UndistributeTable(Oid relationId)
|
||||
UndistributeTable(Oid relationId, bool cascadeViaForeignKeys)
|
||||
{
|
||||
EnsureCoordinator();
|
||||
EnsureRelationExists(relationId);
|
||||
|
@ -1574,16 +1579,37 @@ UndistributeTable(Oid relationId)
|
|||
errdetail("because the table is not distributed")));
|
||||
}
|
||||
|
||||
if (TableReferencing(relationId))
|
||||
bool tableReferencing = TableReferencing(relationId);
|
||||
bool tableReferenced = TableReferenced(relationId);
|
||||
if (cascadeViaForeignKeys && (tableReferencing || tableReferenced))
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot undistribute table "
|
||||
"because it has a foreign key")));
|
||||
CascadeOperationForConnectedRelations(relationId, lockMode, UNDISTRIBUTE_TABLE);
|
||||
|
||||
/*
|
||||
* Undistributed every foreign key connected relation in our foreign key
|
||||
* subgraph including itself, so return here.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
if (TableReferenced(relationId))
|
||||
if (tableReferencing)
|
||||
{
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
ereport(ERROR, (errmsg("cannot undistribute table "
|
||||
"because a foreign key references to it")));
|
||||
"because it has a foreign key"),
|
||||
errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT,
|
||||
qualifiedRelationName,
|
||||
qualifiedRelationName)));
|
||||
}
|
||||
|
||||
if (tableReferenced)
|
||||
{
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
ereport(ERROR, (errmsg("cannot undistribute table "
|
||||
"because a foreign key references to it"),
|
||||
errhint(UNDISTRIBUTE_TABLE_CASCADE_HINT,
|
||||
qualifiedRelationName,
|
||||
qualifiedRelationName)));
|
||||
}
|
||||
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
|
@ -1642,7 +1668,15 @@ UndistributeTable(Oid relationId)
|
|||
}
|
||||
preLoadCommands = lappend(preLoadCommands,
|
||||
makeTableDDLCommandString(attachPartitionCommand));
|
||||
UndistributeTable(partitionRelationId);
|
||||
|
||||
/*
|
||||
* Even if we called UndistributeTable with cascade option, we
|
||||
* shouldn't cascade via foreign keys on partitions. Otherwise,
|
||||
* we might try to undistribute partitions of other tables in
|
||||
* our foreign key subgraph more than once.
|
||||
*/
|
||||
bool cascadeOnPartitionsViaForeignKeys = false;
|
||||
UndistributeTable(partitionRelationId, cascadeOnPartitionsViaForeignKeys);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,5 +6,6 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
|||
#include "udfs/citus_total_relation_size/10.0-1.sql"
|
||||
#include "udfs/citus_tables/10.0-1.sql"
|
||||
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
|
||||
#include "udfs/undistribute_table/10.0-1.sql"
|
||||
|
||||
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
|
||||
DROP VIEW public.citus_tables;
|
||||
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
|
||||
DROP FUNCTION pg_catalog.undistribute_table(regclass,boolean);
|
||||
|
||||
#include "../udfs/citus_total_relation_size/7.0-1.sql"
|
||||
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"
|
||||
#include "../udfs/undistribute_table/9.5-1.sql"
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
DROP FUNCTION pg_catalog.undistribute_table(regclass);
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass, cascade_via_foreign_keys boolean default false)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$undistribute_table$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass, cascade_via_foreign_keys boolean)
|
||||
IS 'undistributes a distributed table';
|
|
@ -1,9 +1,10 @@
|
|||
DROP FUNCTION pg_catalog.undistribute_table(regclass);
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
table_name regclass, cascade_via_foreign_keys boolean default false)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$undistribute_table$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
table_name regclass, cascade_via_foreign_keys boolean)
|
||||
IS 'undistributes a distributed table';
|
||||
|
|
|
@ -396,6 +396,9 @@ extern Oid GetTriggerFunctionId(Oid triggerId);
|
|||
typedef enum CascadeOperationType
|
||||
{
|
||||
INVALID_OPERATION = 1 << 0,
|
||||
|
||||
/* execute UndistributeTable on each relation */
|
||||
UNDISTRIBUTE_TABLE = 1 << 1,
|
||||
} CascadeOperationType;
|
||||
|
||||
extern void CascadeOperationForConnectedRelations(Oid relationId, LOCKMODE relLockMode,
|
||||
|
|
|
@ -138,6 +138,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
|
|||
char distributionMethod, char *colocateWithTableName,
|
||||
bool viaDeprecatedAPI);
|
||||
extern void CreateTruncateTrigger(Oid relationId);
|
||||
extern void UndistributeTable(Oid relationId, bool cascadeViaForeignKeys);
|
||||
|
||||
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
||||
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
|
||||
|
|
|
@ -446,6 +446,7 @@ SELECT * FROM print_extension_changes();
|
|||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_total_relation_size(regclass) |
|
||||
function undistribute_table(regclass) |
|
||||
function upgrade_to_reference_table(regclass) |
|
||||
| access method columnar
|
||||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
|
@ -453,13 +454,14 @@ SELECT * FROM print_extension_changes();
|
|||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function columnar.columnar_handler(internal)
|
||||
| function undistribute_table(regclass,boolean)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(14 rows)
|
||||
(16 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -446,16 +446,18 @@ SELECT * FROM print_extension_changes();
|
|||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_total_relation_size(regclass) |
|
||||
function undistribute_table(regclass) |
|
||||
function upgrade_to_reference_table(regclass) |
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function undistribute_table(regclass,boolean)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(10 rows)
|
||||
(12 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -1019,6 +1019,72 @@ SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
|
||||
NOTICE: undistributing the partitions of single_node.partitioned_table_1
|
||||
NOTICE: creating a new local table for single_node.partitioned_table_1_100_200
|
||||
NOTICE: Moving the data of single_node.partitioned_table_1_100_200
|
||||
NOTICE: Dropping the old single_node.partitioned_table_1_100_200
|
||||
NOTICE: Renaming the new table to single_node.partitioned_table_1_100_200
|
||||
NOTICE: creating a new local table for single_node.partitioned_table_1_200_300
|
||||
NOTICE: Moving the data of single_node.partitioned_table_1_200_300
|
||||
NOTICE: Dropping the old single_node.partitioned_table_1_200_300
|
||||
NOTICE: Renaming the new table to single_node.partitioned_table_1_200_300
|
||||
NOTICE: creating a new local table for single_node.partitioned_table_1
|
||||
NOTICE: Moving the data of single_node.partitioned_table_1
|
||||
NOTICE: Dropping the old single_node.partitioned_table_1
|
||||
NOTICE: Renaming the new table to single_node.partitioned_table_1
|
||||
NOTICE: creating a new local table for single_node.reference_table_1
|
||||
NOTICE: Moving the data of single_node.reference_table_1
|
||||
NOTICE: Dropping the old single_node.reference_table_1
|
||||
NOTICE: Renaming the new table to single_node.reference_table_1
|
||||
NOTICE: creating a new local table for single_node.distributed_table_1
|
||||
NOTICE: Moving the data of single_node.distributed_table_1
|
||||
NOTICE: Dropping the old single_node.distributed_table_1
|
||||
NOTICE: Renaming the new table to single_node.distributed_table_1
|
||||
NOTICE: creating a new local table for single_node.citus_local_table_1
|
||||
NOTICE: Moving the data of single_node.citus_local_table_1
|
||||
NOTICE: Dropping the old single_node.citus_local_table_1
|
||||
NOTICE: Renaming the new table to single_node.citus_local_table_1
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO test (x) VALUES ($1);
|
||||
|
|
|
@ -123,8 +123,10 @@ SELECT create_distributed_table('referencing_table', 'id');
|
|||
INSERT INTO referencing_table VALUES (4, 6, 'cba'), (1, 1, 'dcba'), (2, 3, 'aaa');
|
||||
SELECT undistribute_table('referenced_table');
|
||||
ERROR: cannot undistribute table because a foreign key references to it
|
||||
HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referenced_table by executing SELECT undistribute_table($$undistribute_table.referenced_table$$, cascade_via_foreign_keys=>true)
|
||||
SELECT undistribute_table('referencing_table');
|
||||
ERROR: cannot undistribute table because it has a foreign key
|
||||
HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true)
|
||||
DROP TABLE referenced_table, referencing_table;
|
||||
-- test distributed foreign tables
|
||||
-- we expect errors
|
||||
|
|
|
@ -0,0 +1,381 @@
|
|||
\set VERBOSITY terse
|
||||
SET citus.next_shard_id TO 1515000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE SCHEMA undistribute_table_cascade;
|
||||
SET search_path TO undistribute_table_cascade;
|
||||
SET client_min_messages to ERROR;
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE);
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE distributed_table_2 (col_1 INT UNIQUE);
|
||||
CREATE TABLE distributed_table_3 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table_2', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table_3', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_citus_local_table('citus_local_table_2');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- --- ---
|
||||
-- | | | |
|
||||
-- | v | v
|
||||
-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2
|
||||
-- ^ | ^ |
|
||||
-- v | | v
|
||||
-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2
|
||||
--
|
||||
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
-- show that all of below fails as we didn't provide cascade=true
|
||||
SELECT undistribute_table('distributed_table_1');
|
||||
ERROR: cannot undistribute table because it has a foreign key
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false);
|
||||
ERROR: cannot undistribute table because it has a foreign key
|
||||
SELECT undistribute_table('reference_table_2');
|
||||
ERROR: cannot undistribute table because it has a foreign key
|
||||
-- In each of below transation blocks, show that we preserve foreign keys.
|
||||
-- Also show that we don't have any citus tables in current schema after
|
||||
-- undistribute_table(cascade).
|
||||
-- So in each transaction, both selects should return true.
|
||||
BEGIN;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show that we switch to sequential execution as there are
|
||||
-- reference tables in our subgraph
|
||||
show citus.multi_shard_modify_mode;
|
||||
citus.multi_shard_modify_mode
|
||||
---------------------------------------------------------------------
|
||||
sequential
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*)=10 FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*)=10 FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- print foreign keys only in one of xact blocks not to make tests too verbose
|
||||
SELECT conname, conrelid::regclass, confrelid::regclass
|
||||
FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$'
|
||||
ORDER BY conname;
|
||||
conname | conrelid | confrelid
|
||||
---------------------------------------------------------------------
|
||||
fkey_1 | distributed_table_3 | distributed_table_2
|
||||
fkey_11 | distributed_table_2 | distributed_table_2
|
||||
fkey_12 | reference_table_1 | reference_table_1
|
||||
fkey_2 | distributed_table_2 | distributed_table_3
|
||||
fkey_3 | distributed_table_2 | distributed_table_1
|
||||
fkey_4 | distributed_table_1 | reference_table_1
|
||||
fkey_5 | reference_table_2 | reference_table_1
|
||||
fkey_6 | citus_local_table_1 | reference_table_1
|
||||
fkey_7 | reference_table_2 | citus_local_table_2
|
||||
fkey_8 | distributed_table_1 | distributed_table_3
|
||||
(10 rows)
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
SELECT COUNT(*) FROM distributed_table_1;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- show that we error out as select is executed in parallel mode
|
||||
-- and there are reference tables in our subgraph
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
ERROR: cannot execute command because there was a parallel operation on a distributed table in transaction
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
set citus.multi_shard_modify_mode to 'sequential';
|
||||
SELECT COUNT(*) FROM distributed_table_1;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- even if there are reference tables in our subgraph, show that
|
||||
-- we don't error out as we already switched to sequential execution
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4;
|
||||
BEGIN;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- as we splitted distributed_table_1,2 & 3 into a seperate subgraph
|
||||
-- by dropping reference_table_1, we should not switch to sequential
|
||||
-- execution
|
||||
show citus.multi_shard_modify_mode;
|
||||
citus.multi_shard_modify_mode
|
||||
---------------------------------------------------------------------
|
||||
parallel
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- split distributed_table_2 & distributed_table_3 into a seperate foreign
|
||||
-- key subgraph then undistribute them
|
||||
ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3;
|
||||
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- should return true as we undistributed those two tables
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade' AND
|
||||
(tablename='distributed_table_2' OR tablename='distributed_table_3');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- other tables should stay as is since we splited those two tables
|
||||
SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- test partitioned tables
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_2', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE);
|
||||
SELECT create_reference_table('reference_table_3');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
|
||||
ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
|
||||
BEGIN;
|
||||
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- show that we preserve foreign keys on partitions too
|
||||
SELECT conname, conrelid::regclass, confrelid::regclass
|
||||
FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname = 'fkey_9' OR conname = 'fkey_10'
|
||||
ORDER BY 1,2,3;
|
||||
conname | conrelid | confrelid
|
||||
---------------------------------------------------------------------
|
||||
fkey_10 | partitioned_table_2_100_200 | reference_table_3
|
||||
fkey_10 | partitioned_table_2_200_300 | reference_table_3
|
||||
fkey_10 | partitioned_table_2 | reference_table_3
|
||||
fkey_9 | partitioned_table_1_100_200 | reference_table_3
|
||||
fkey_9 | partitioned_table_1_200_300 | reference_table_3
|
||||
fkey_9 | partitioned_table_1 | reference_table_3
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- as pg < 12 doesn't support foreign keys between partitioned tables,
|
||||
-- define below foreign key conditionally instead of adding another
|
||||
-- test output
|
||||
DO $proc$
|
||||
BEGIN
|
||||
IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN
|
||||
EXECUTE
|
||||
$$
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1);
|
||||
$$;
|
||||
END IF;
|
||||
END$proc$;
|
||||
BEGIN;
|
||||
-- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3
|
||||
-- and partitioned_table_2 references to reference_table_3.
|
||||
-- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well.
|
||||
-- Anyway show that undistribute_table with cascade is fine.
|
||||
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- now merge partitioned_table_1, 2 and reference_table_3 into right
|
||||
-- hand-side of the graph
|
||||
ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
BEGIN;
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- undistributing citus_local_table_1 cascades to partitioned tables too
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade' AND
|
||||
tablename LIKE 'partitioned_table_%';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
CREATE SCHEMA "bad!schemaName";
|
||||
CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE);
|
||||
CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1);
|
||||
-- test with weird schema, table & constraint names
|
||||
SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE;
|
|
@ -0,0 +1,81 @@
|
|||
\set VERBOSITY terse
|
||||
SET citus.next_shard_id TO 1517000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.enable_local_execution TO ON;
|
||||
CREATE SCHEMA undistribute_table_cascade_mx;
|
||||
SET search_path TO undistribute_table_cascade_mx;
|
||||
SET client_min_messages to ERROR;
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- ensure that we sync metadata to worker 1 & 2
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- both workers should print 0 as we undistributed all relations in this schema
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx'
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,0)
|
||||
(localhost,57638,t,0)
|
||||
(2 rows)
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA undistribute_table_cascade_mx CASCADE;
|
|
@ -153,7 +153,7 @@ ORDER BY 1;
|
|||
function start_metadata_sync_to_node(text,integer)
|
||||
function stop_metadata_sync_to_node(text,integer)
|
||||
function truncate_local_data_after_distributing_table(regclass)
|
||||
function undistribute_table(regclass)
|
||||
function undistribute_table(regclass,boolean)
|
||||
function update_distributed_table_colocation(regclass,text)
|
||||
function worker_append_table_to_shard(text,text,text,integer)
|
||||
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
|
||||
|
|
|
@ -149,7 +149,7 @@ ORDER BY 1;
|
|||
function start_metadata_sync_to_node(text,integer)
|
||||
function stop_metadata_sync_to_node(text,integer)
|
||||
function truncate_local_data_after_distributing_table(regclass)
|
||||
function undistribute_table(regclass)
|
||||
function undistribute_table(regclass,boolean)
|
||||
function update_distributed_table_colocation(regclass,text)
|
||||
function worker_append_table_to_shard(text,text,text,integer)
|
||||
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
|
||||
|
|
|
@ -42,6 +42,7 @@ test: multi_mx_function_call_delegation
|
|||
test: multi_mx_modifications local_shard_execution
|
||||
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
||||
test: local_shard_copy
|
||||
test: undistribute_table_cascade_mx
|
||||
test: citus_local_tables_mx
|
||||
test: citus_local_tables_queries_mx
|
||||
test: multi_mx_transaction_recovery
|
||||
|
|
|
@ -322,7 +322,7 @@ test: replicate_reference_tables_to_coordinator
|
|||
test: coordinator_shouldhaveshards
|
||||
test: local_shard_utility_command_execution
|
||||
test: citus_local_tables
|
||||
test: multi_row_router_insert mixed_relkind_tests
|
||||
test: multi_row_router_insert mixed_relkind_tests undistribute_table_cascade
|
||||
|
||||
test: remove_coordinator
|
||||
|
||||
|
|
|
@ -576,6 +576,28 @@ ALTER TABLE test DROP CONSTRAINT foreign_key;
|
|||
SELECT undistribute_table('test_2');
|
||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
|
||||
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
|
||||
|
||||
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO test (x) VALUES ($1);
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
\set VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1515000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE SCHEMA undistribute_table_cascade;
|
||||
SET search_path TO undistribute_table_cascade;
|
||||
|
||||
SET client_min_messages to ERROR;
|
||||
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
CREATE TABLE reference_table_2 (col_1 INT UNIQUE, col_2 INT UNIQUE);
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE distributed_table_2 (col_1 INT UNIQUE);
|
||||
CREATE TABLE distributed_table_3 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
SELECT create_distributed_table('distributed_table_2', 'col_1');
|
||||
SELECT create_distributed_table('distributed_table_3', 'col_1');
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
CREATE TABLE citus_local_table_2 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
SELECT create_citus_local_table('citus_local_table_2');
|
||||
|
||||
-- --- ---
|
||||
-- | | | |
|
||||
-- | v | v
|
||||
-- distributed_table_2 -> distributed_table_1 -> reference_table_1 <- reference_table_2
|
||||
-- ^ | ^ |
|
||||
-- v | | v
|
||||
-- distributed_table_3 <------------ citus_local_table_1 citus_local_table_2
|
||||
--
|
||||
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES distributed_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1, col_2) REFERENCES reference_table_1(col_2, col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES citus_local_table_2(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES distributed_table_3(col_1);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_11 FOREIGN KEY (col_1) REFERENCES distributed_table_2(col_1);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_12 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
|
||||
-- show that all of below fails as we didn't provide cascade=true
|
||||
SELECT undistribute_table('distributed_table_1');
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>false);
|
||||
SELECT undistribute_table('reference_table_2');
|
||||
|
||||
-- In each of below transation blocks, show that we preserve foreign keys.
|
||||
-- Also show that we don't have any citus tables in current schema after
|
||||
-- undistribute_table(cascade).
|
||||
-- So in each transaction, both selects should return true.
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- show that we switch to sequential execution as there are
|
||||
-- reference tables in our subgraph
|
||||
show citus.multi_shard_modify_mode;
|
||||
|
||||
SELECT COUNT(*)=10 FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$';
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
|
||||
SELECT COUNT(*)=10 FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$';
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- print foreign keys only in one of xact blocks not to make tests too verbose
|
||||
SELECT conname, conrelid::regclass, confrelid::regclass
|
||||
FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname ~ '^fkey\_\d+$'
|
||||
ORDER BY conname;
|
||||
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT COUNT(*) FROM distributed_table_1;
|
||||
-- show that we error out as select is executed in parallel mode
|
||||
-- and there are reference tables in our subgraph
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
set citus.multi_shard_modify_mode to 'sequential';
|
||||
SELECT COUNT(*) FROM distributed_table_1;
|
||||
-- even if there are reference tables in our subgraph, show that
|
||||
-- we don't error out as we already switched to sequential execution
|
||||
SELECT undistribute_table('reference_table_1', cascade_via_foreign_keys=>true);
|
||||
ROLLBACK;
|
||||
|
||||
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_4;
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- as we splitted distributed_table_1,2 & 3 into a seperate subgraph
|
||||
-- by dropping reference_table_1, we should not switch to sequential
|
||||
-- execution
|
||||
show citus.multi_shard_modify_mode;
|
||||
ROLLBACK;
|
||||
|
||||
-- split distributed_table_2 & distributed_table_3 into a seperate foreign
|
||||
-- key subgraph then undistribute them
|
||||
ALTER TABLE distributed_table_2 DROP CONSTRAINT fkey_3;
|
||||
ALTER TABLE distributed_table_1 DROP CONSTRAINT fkey_8;
|
||||
SELECT undistribute_table('distributed_table_2', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- should return true as we undistributed those two tables
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade' AND
|
||||
(tablename='distributed_table_2' OR tablename='distributed_table_3');
|
||||
|
||||
-- other tables should stay as is since we splited those two tables
|
||||
SELECT COUNT(*)=5 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade';
|
||||
|
||||
-- test partitioned tables
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
|
||||
CREATE TABLE partitioned_table_2 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_2_100_200 PARTITION OF partitioned_table_2 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_2_200_300 PARTITION OF partitioned_table_2 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_2', 'col_1');
|
||||
|
||||
CREATE TABLE reference_table_3 (col_1 INT UNIQUE, col_2 INT UNIQUE);
|
||||
SELECT create_reference_table('reference_table_3');
|
||||
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_9 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
|
||||
ALTER TABLE partitioned_table_2 ADD CONSTRAINT fkey_10 FOREIGN KEY (col_1) REFERENCES reference_table_3(col_2);
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- show that we preserve foreign keys on partitions too
|
||||
SELECT conname, conrelid::regclass, confrelid::regclass
|
||||
FROM pg_constraint
|
||||
WHERE connamespace = (SELECT oid FROM pg_namespace WHERE nspname='undistribute_table_cascade') AND
|
||||
conname = 'fkey_9' OR conname = 'fkey_10'
|
||||
ORDER BY 1,2,3;
|
||||
ROLLBACK;
|
||||
|
||||
-- as pg < 12 doesn't support foreign keys between partitioned tables,
|
||||
-- define below foreign key conditionally instead of adding another
|
||||
-- test output
|
||||
DO $proc$
|
||||
BEGIN
|
||||
IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN
|
||||
EXECUTE
|
||||
$$
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_13 FOREIGN KEY (col_1) REFERENCES partitioned_table_2(col_1);
|
||||
$$;
|
||||
END IF;
|
||||
END$proc$;
|
||||
|
||||
BEGIN;
|
||||
-- For pg versions 11, 12 & 13, partitioned_table_1 references to reference_table_3
|
||||
-- and partitioned_table_2 references to reference_table_3.
|
||||
-- For pg versions > 11, partitioned_table_1 references to partitioned_table_2 as well.
|
||||
-- Anyway show that undistribute_table with cascade is fine.
|
||||
SELECT undistribute_table('partitioned_table_2', cascade_via_foreign_keys=>true);
|
||||
ROLLBACK;
|
||||
|
||||
-- now merge partitioned_table_1, 2 and reference_table_3 into right
|
||||
-- hand-side of the graph
|
||||
ALTER TABLE reference_table_3 ADD CONSTRAINT fkey_14 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
|
||||
BEGIN;
|
||||
SELECT undistribute_table('citus_local_table_1', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- undistributing citus_local_table_1 cascades to partitioned tables too
|
||||
SELECT COUNT(*)=0 FROM pg_dist_partition, pg_tables
|
||||
WHERE tablename=logicalrelid::regclass::text AND
|
||||
schemaname='undistribute_table_cascade' AND
|
||||
tablename LIKE 'partitioned_table_%';
|
||||
ROLLBACK;
|
||||
|
||||
CREATE SCHEMA "bad!schemaName";
|
||||
|
||||
CREATE TABLE "bad!schemaName"."LocalTabLE.1!?!"(col_1 INT UNIQUE);
|
||||
CREATE TABLE "bad!schemaName"."LocalTabLE.2!?!"(col_1 INT UNIQUE);
|
||||
|
||||
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.1!?!"');
|
||||
SELECT create_citus_local_table('"bad!schemaName"."LocalTabLE.2!?!"');
|
||||
|
||||
ALTER TABLE "bad!schemaName"."LocalTabLE.1!?!" ADD CONSTRAINT "bad!constraintName" FOREIGN KEY (col_1) REFERENCES "bad!schemaName"."LocalTabLE.2!?!"(col_1);
|
||||
|
||||
-- test with weird schema, table & constraint names
|
||||
SELECT undistribute_table('"bad!schemaName"."LocalTabLE.1!?!"', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA undistribute_table_cascade, "bad!schemaName" CASCADE;
|
|
@ -0,0 +1,48 @@
|
|||
\set VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1517000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.enable_local_execution TO ON;
|
||||
|
||||
CREATE SCHEMA undistribute_table_cascade_mx;
|
||||
SET search_path TO undistribute_table_cascade_mx;
|
||||
|
||||
SET client_min_messages to ERROR;
|
||||
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
|
||||
-- ensure that we sync metadata to worker 1 & 2
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
|
||||
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
||||
|
||||
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
||||
SELECT create_citus_local_table('citus_local_table_1');
|
||||
|
||||
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
||||
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
||||
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
||||
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
||||
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
||||
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
||||
|
||||
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
|
||||
|
||||
-- both workers should print 0 as we undistributed all relations in this schema
|
||||
SELECT run_command_on_workers(
|
||||
$$
|
||||
SELECT count(*) FROM pg_catalog.pg_tables WHERE schemaname='undistribute_table_cascade_mx'
|
||||
$$);
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA undistribute_table_cascade_mx CASCADE;
|
Loading…
Reference in New Issue