From 2218b7e38d977ebf0171cdf49cf00975f8f280ca Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Apr 2020 11:31:35 -0700 Subject: [PATCH 1/3] Refactor ReplicateColocatedShardPlacement --- .../distributed/master/master_repair_shards.c | 177 ++++++++++-------- 1 file changed, 95 insertions(+), 82 deletions(-) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index de0c20d4e..70dba5e2a 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -53,9 +53,9 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode); -static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); +static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); @@ -67,6 +67,8 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); +static void EnsureTableListOwner(List *tableIdList); +static void EnsureTableListSuitableForReplication(List *tableIdList); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_copy_shard_placement); @@ -383,57 +385,21 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, List *colocatedTableList = ColocatedTableList(distributedTableId); List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - Oid colocatedTableId = InvalidOid; - ListCell *colocatedShardCell = NULL; - foreach_oid(colocatedTableId, colocatedTableList) - { - char relationKind = '\0'; - - /* check that user has owner rights in all co-located tables */ - EnsureTableOwner(colocatedTableId); - - relationKind = get_rel_relkind(colocatedTableId); - if (relationKind == RELKIND_FOREIGN_TABLE) - { - char *relationName = get_rel_name(colocatedTableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot replicate shard"), - errdetail("Table %s is a foreign table. Replicating " - "shards backed by foreign tables is " - "not supported.", relationName))); - } - - List *foreignConstraintCommandList = GetTableForeignConstraintCommands( - colocatedTableId); - - if (foreignConstraintCommandList != NIL && - PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot create foreign key constraint"), - errdetail("This shard has foreign constraints on it. " - "Citus currently supports " - "foreign key constraints only for " - "\"citus.shard_replication_factor = 1\"."), - errhint("Please change \"citus.shard_replication_factor to " - "1\". To learn more about using foreign keys with " - "other replication factors, please contact us at " - "https://citusdata.com/about/contact_us."))); - } - } + EnsureTableListOwner(colocatedTableList); + EnsureTableListSuitableForReplication(colocatedTableList); /* - * We sort colocatedShardList so that lock operations will not cause any + * We sort shardIntervalList so that lock operations will not cause any * deadlocks. */ colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); BlockWritesToShardList(colocatedShardList); - foreach(colocatedShardCell, colocatedShardList) + ShardInterval *colocatedShard = NULL; + foreach_ptr(colocatedShard, colocatedShardList) { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); uint64 colocatedShardId = colocatedShard->shardId; /* @@ -457,12 +423,76 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, EnsureReferenceTablesExistOnAllNodes(); } - /* - * CopyColocatedShardPlacement function copies given shard with its co-located - * shards. - */ - CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + + /* finally insert the placements to pg_dist_placement */ + foreach_ptr(colocatedShard, colocatedShardList) + { + uint64 colocatedShardId = colocatedShard->shardId; + uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + + InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), + groupId); + } +} + + +/* + * EnsureTableListOwner ensures current user owns given tables. Superusers + * are regarded as owners. + */ +static void +EnsureTableListOwner(List *tableIdList) +{ + Oid tableId = InvalidOid; + foreach_oid(tableId, tableIdList) + { + EnsureTableOwner(tableId); + } +} + + +/* + * EnsureTableListSuitableForReplication errors out if given tables are not + * suitable for replication. + */ +static void +EnsureTableListSuitableForReplication(List *tableIdList) +{ + Oid tableId = InvalidOid; + foreach_oid(tableId, tableIdList) + { + char relationKind = get_rel_relkind(tableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(tableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate shard"), + errdetail("Table %s is a foreign table. Replicating " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + + List *foreignConstraintCommandList = + GetTableForeignConstraintCommands(tableId); + + if (foreignConstraintCommandList != NIL && + PartitionMethod(tableId) != DISTRIBUTE_BY_NONE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create foreign key constraint"), + errdetail("This shard has foreign constraints on it. " + "Citus currently supports " + "foreign key constraints only for " + "\"citus.shard_replication_factor = 1\"."), + errhint("Please change \"citus.shard_replication_factor to " + "1\". To learn more about using foreign keys with " + "other replication factors, please contact us at " + "https://citusdata.com/about/contact_us."))); + } + } } @@ -473,28 +503,25 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, * necessary. */ static void -CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort) +CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) { - ShardInterval *shardInterval = LoadShardInterval(shardId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - ListCell *colocatedShardCell = NULL; + ShardInterval *shardInterval = NULL; /* iterate through the colocated shards and copy each */ - foreach(colocatedShardCell, colocatedShardList) + foreach_ptr(shardInterval, shardIntervalList) { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); bool includeDataCopy = true; - if (PartitionedTable(colocatedShard->relationId)) + if (PartitionedTable(shardInterval->relationId)) { /* partitioned tables contain no data */ includeDataCopy = false; } - List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, + List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeDataCopy); - char *tableOwner = TableOwner(colocatedShard->relationId); + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -517,25 +544,23 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod * the copied shard would get updated twice because of a cascading DML coming * from both of the placements. */ - foreach(colocatedShardCell, colocatedShardList) + foreach_ptr(shardInterval, shardIntervalList) { - List *colocatedShardForeignConstraintCommandList = NIL; + List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - char *tableOwner = TableOwner(colocatedShard->relationId); + char *tableOwner = TableOwner(shardInterval->relationId); - CopyShardForeignConstraintCommandListGrouped(colocatedShard, - & - colocatedShardForeignConstraintCommandList, + CopyShardForeignConstraintCommandListGrouped(shardInterval, + &shardForeignConstraintCommandList, &referenceTableForeignConstraintList); - List *commandList = list_concat(colocatedShardForeignConstraintCommandList, + List *commandList = list_concat(shardForeignConstraintCommandList, referenceTableForeignConstraintList); - if (PartitionTable(colocatedShard->relationId)) + if (PartitionTable(shardInterval->relationId)) { char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(colocatedShard); + GenerateAttachShardPartitionCommand(shardInterval); commandList = lappend(commandList, attachPartitionCommand); } @@ -543,18 +568,6 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, commandList); } - - /* finally insert the placements to pg_dist_placement */ - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - uint64 colocatedShardId = colocatedShard->shardId; - uint32 groupId = GroupForNode(targetNodeName, targetNodePort); - - InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, - SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), - groupId); - } } From f9de734329b916fc6d576d8ffc16518a34b8e00f Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Apr 2020 11:32:06 -0700 Subject: [PATCH 2/3] Ensure metadata is synced on ReplicateColocatedShardPlacement --- .../distributed/master/master_repair_shards.c | 17 ++- .../multi_replicate_reference_table.out | 110 +++++++++++++++++- .../sql/multi_replicate_reference_table.sql | 67 ++++++++++- 3 files changed, 182 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 70dba5e2a..850e65c58 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -426,15 +426,28 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); - /* finally insert the placements to pg_dist_placement */ + /* + * Finally insert the placements to pg_dist_placement and sync it to the + * metadata workers. + */ foreach_ptr(colocatedShard, colocatedShardList) { uint64 colocatedShardId = colocatedShard->shardId; uint32 groupId = GroupForNode(targetNodeName, targetNodePort); + uint64 placementId = GetNextPlacementId(); - InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + InsertShardPlacementRow(colocatedShardId, placementId, SHARD_STATE_ACTIVE, ShardLength(colocatedShardId), groupId); + + if (ShouldSyncTableMetadata(colocatedShard->relationId)) + { + char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId, + SHARD_STATE_ACTIVE, 0, + groupId); + + SendCommandToWorkersWithMetadata(placementCommand); + } } } diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index bf7c9377c..bcbc634d2 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -848,7 +848,8 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r (1 row) SET client_min_messages TO WARNING; -SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset -- remove reference table replica from worker 2 SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? @@ -856,7 +857,7 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- -1 @@ -871,7 +872,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- 0 @@ -890,7 +891,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- -1 @@ -902,7 +903,7 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port); 1 (1 row) -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; ?column? --------------------------------------------------------------------- 0 @@ -914,6 +915,105 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r t (1 row) +-- test that metadata is synced when master_copy_shard_placement replicates +-- reference table shards +SET citus.replicate_reference_tables_on_activate TO off; +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.replication_model TO streaming; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT master_copy_shard_placement( + :ref_table_shard, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + master_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + ?column? +--------------------------------------------------------------------- + 0 +(1 row) + +-- test that metadata is synced on replicate_reference_tables +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT replicate_reference_tables(); + replicate_reference_tables +--------------------------------------------------------------------- + +(1 row) + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + ?column? +--------------------------------------------------------------------- + 0 +(1 row) + +-- join the reference table with a distributed table from worker 1 +-- to verify that metadata for worker 2 placements have been synced +-- to worker 1. +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i; +TRUNCATE ref_table; +INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i; +\c - - - :worker_1_port +SET search_path TO replicate_reference_table; +SELECT array_agg(dist_table.b ORDER BY ref_table.a) +FROM ref_table, dist_table +WHERE ref_table.a = dist_table.a; + array_agg +--------------------------------------------------------------------- + {4,16,36,64,100} +(1 row) + +\c - - - :master_port +SET search_path TO replicate_reference_table; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 7b1951bfb..4535aebfc 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -551,32 +551,89 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r SET client_min_messages TO WARNING; -SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset +SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset + +SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset -- remove reference table replica from worker 2 SELECT 1 FROM master_remove_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; -- test setting citus.replicate_reference_tables_on_activate to on -- master_add_node SET citus.replicate_reference_tables_on_activate TO on; SELECT 1 FROM master_add_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; -- master_activate_node SELECT 1 FROM master_remove_node('localhost', :worker_2_port); SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; SELECT 1 FROM master_activate_node('localhost', :worker_2_port); -SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; +SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard; SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('ref_table', 'SELECT sum(a) FROM %s'); +-- test that metadata is synced when master_copy_shard_placement replicates +-- reference table shards +SET citus.replicate_reference_tables_on_activate TO off; +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +SET citus.replication_model TO streaming; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +SELECT master_copy_shard_placement( + :ref_table_shard, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false, + transfer_mode := 'block_writes'); + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + +-- test that metadata is synced on replicate_reference_tables +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +SELECT replicate_reference_tables(); + +SELECT result::int - :ref_table_placements +FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''') +WHERE nodeport=:worker_1_port; + +-- join the reference table with a distributed table from worker 1 +-- to verify that metadata for worker 2 placements have been synced +-- to worker 1. + +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); +INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i; + +TRUNCATE ref_table; +INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i; + +\c - - - :worker_1_port + +SET search_path TO replicate_reference_table; + +SELECT array_agg(dist_table.b ORDER BY ref_table.a) +FROM ref_table, dist_table +WHERE ref_table.a = dist_table.a; + +\c - - - :master_port + +SET search_path TO replicate_reference_table; + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings From 2639a9a19d825835fd639e6ea59a2ae525dc9cee Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 13 Apr 2020 12:45:27 -0700 Subject: [PATCH 3/3] Test master_copy_shard_placement errors on foreign constraints --- .../expected/master_copy_shard_placement.out | 14 +++++++++++++- .../regress/sql/master_copy_shard_placement.sql | 14 +++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/master_copy_shard_placement.out b/src/test/regress/expected/master_copy_shard_placement.out index b39be6d82..f8e37fa39 100644 --- a/src/test/regress/expected/master_copy_shard_placement.out +++ b/src/test/regress/expected/master_copy_shard_placement.out @@ -4,7 +4,7 @@ SET search_path TO mcsp; SET citus.next_shard_id TO 8139000; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'statement'; -CREATE TABLE ref_table(a int); +CREATE TABLE ref_table(a int, b text unique); SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- @@ -62,6 +62,18 @@ SELECT master_copy_shard_placement( 'localhost', :worker_2_port, do_repair := false); ERROR: shard xxxxx already exists in the target node +-- verify we error out if table has foreign key constraints +INSERT INTO ref_table SELECT 1, value FROM data; +ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL; +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); +ERROR: cannot create foreign key constraint +DETAIL: This shard has foreign constraints on it. Citus currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +ALTER TABLE data DROP CONSTRAINT distfk; -- replicate shard that contains key-1 SELECT master_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'), diff --git a/src/test/regress/sql/master_copy_shard_placement.sql b/src/test/regress/sql/master_copy_shard_placement.sql index dee374809..6d0f2234c 100644 --- a/src/test/regress/sql/master_copy_shard_placement.sql +++ b/src/test/regress/sql/master_copy_shard_placement.sql @@ -5,7 +5,7 @@ SET citus.next_shard_id TO 8139000; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'statement'; -CREATE TABLE ref_table(a int); +CREATE TABLE ref_table(a int, b text unique); SELECT create_reference_table('ref_table'); CREATE TABLE data ( @@ -52,6 +52,18 @@ SELECT master_copy_shard_placement( 'localhost', :worker_2_port, do_repair := false); +-- verify we error out if table has foreign key constraints +INSERT INTO ref_table SELECT 1, value FROM data; + +ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL; +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('data', 'key-1'), + 'localhost', :worker_2_port, + 'localhost', :worker_1_port, + do_repair := false); + +ALTER TABLE data DROP CONSTRAINT distfk; + -- replicate shard that contains key-1 SELECT master_copy_shard_placement( get_shard_id_for_distribution_column('data', 'key-1'),