Tests for master_copy_shard_placement

pull/3591/head
Hadi Moshayedi 2020-03-16 16:24:44 -07:00
parent ede176d849
commit b46b9a68ae
5 changed files with 771 additions and 226 deletions

View File

@ -117,18 +117,17 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
ShardInterval *shardInterval = LoadShardInterval(shardId);
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
if (!doRepair)
if (doRepair)
{
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort);
}
else
{
ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode);
}
else
{
/* RepairShardPlacement function repairs only given shard */
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort);
}
PG_RETURN_VOID();
}
@ -190,41 +189,38 @@ BlockWritesToShardList(List *shardList)
/*
* ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable
* for its shard being replicated. There are 2 cases in which shard replication is not
* allowed:
*
* 1) MX tables, since RF=1 is a must MX tables
* 2) Reference tables, since the shard should already exist in all workers
* for its shard being replicated. Shard replications is not allowed only for MX tables,
* since RF=1 is a must MX tables.
*/
static void
ErrorIfTableCannotBeReplicated(Oid relationId)
{
/*
* Note that ShouldSyncTableMetadata() returns true for both MX tables
* and reference tables.
*/
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (shouldSyncMetadata)
if (!shouldSyncMetadata)
{
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
char *relationName = get_rel_name(relationId);
StringInfo errorDetailString = makeStringInfo();
return;
}
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
appendStringInfo(errorDetailString,
"Table %s is streaming replicated. Shards "
"of streaming replicated tables cannot "
"be copied", relationName);
}
else if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
appendStringInfo(errorDetailString, "Table %s is a reference table. Shards "
"of reference tables cannot be copied",
relationName);
return;
}
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
char *relationName = get_rel_name(relationId);
/*
* ShouldSyncTableMetadata() returns true also for reference table,
* we don't want to error in that case since reference tables aren't
* automatically replicated to active nodes with no shards, and
* master_copy_shard_placement() can be used to create placements in
* such nodes.
*/
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy shard"),
errdetail("%s", errorDetailString->data)));
(errmsg("Table %s is streaming replicated. Shards "
"of streaming replicated tables cannot "
"be copied", quote_literal_cstr(relationName)))));
}
}
@ -381,8 +377,8 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
/*
* ReplicateColocatedShardPlacement replicated given shard and its colocated shards
* from a source node to target node.
* ReplicateColocatedShardPlacement replicates the given shard and its
* colocated shards from a source node to target node.
*/
static void
ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
@ -394,13 +390,11 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedTableCell = NULL;
Oid colocatedTableId = InvalidOid;
ListCell *colocatedShardCell = NULL;
foreach(colocatedTableCell, colocatedTableList)
foreach_oid(colocatedTableId, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
char relationKind = '\0';
/* check that user has owner rights in all co-located tables */
@ -411,8 +405,8 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot repair shard"),
errdetail("Table %s is a foreign table. Repairing "
errmsg("cannot replicate shard"),
errdetail("Table %s is a foreign table. Replicating "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
@ -441,6 +435,9 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
* deadlocks.
*/
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
BlockWritesToShardList(colocatedShardList);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
@ -454,8 +451,6 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
targetNodeName, targetNodePort);
}
BlockWritesToShardList(colocatedShardList);
/*
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.
@ -659,7 +654,8 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
if (targetPlacement != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("shard %ld already exist in target placement", shardId)));
errmsg("shard " INT64_FORMAT " already exists in the target node",
shardId)));
}
}

View File

@ -3,7 +3,14 @@ CREATE SCHEMA mcsp;
SET search_path TO mcsp;
SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replicatiOn_model TO 'statement';
SET citus.replication_model TO 'statement';
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE data (
key text primary key,
value text not null,
@ -33,6 +40,28 @@ INSERT INTO data VALUES ('key-1', 'value-1');
INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
-- verify we error out if no healthy placement exists at source
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
ERROR: could not find placement matching "localhost:xxxxx"
HINT: Confirm the placement still exists and try again.
-- verify we error out if source and destination are the same
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_2_port,
do_repair := false);
ERROR: shard xxxxx already exists in the target node
-- verify we error out if target already contains a healthy placement
SELECT master_copy_shard_placement(
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
ERROR: shard xxxxx already exists in the target node
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
@ -62,7 +91,33 @@ SELECT count(*) FROM history;
2
(1 row)
-- test we can not replicate MX tables
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
CREATE TABLE mx_table(a int);
SELECT create_distributed_table('mx_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('mx_table', '1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO ERROR;
DROP SCHEMA mcsp CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table data
drop cascades to table history

View File

@ -4,16 +4,21 @@ setup
{
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_repair_placement_vs_modification (x int, y int);
SELECT create_distributed_table('test_repair_placement_vs_modification', 'x');
SELECT get_shard_id_for_distribution_column('test_repair_placement_vs_modification', 5) INTO selected_shard;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_copy_placement_vs_modification (x int, y int);
SELECT create_distributed_table('test_copy_placement_vs_modification', 'x');
SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard;
}
teardown
{
DROP TABLE test_copy_placement_vs_modification;
DROP TABLE test_repair_placement_vs_modification;
DROP TABLE selected_shard;
DROP TABLE test_copy_placement_vs_modification;
}
session "s1"
@ -24,39 +29,69 @@ step "s1-begin"
SET LOCAL citus.select_opens_transaction_block TO off;
}
// since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements
// since test_repair_placement_vs_modification has rep > 1 simple select query doesn't hit all placements
// hence not all placements are cached
step "s1-load-cache"
{
TRUNCATE test_copy_placement_vs_modification;
TRUNCATE test_repair_placement_vs_modification;
}
step "s1-insert"
{
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
}
step "s1-update"
{
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5;
}
step "s1-delete"
{
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
DELETE FROM test_repair_placement_vs_modification WHERE x = 5;
}
step "s1-select"
{
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
}
step "s1-ddl"
{
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x);
}
step "s1-copy"
{
COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
}
step "s1-insert-copy-table"
{
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
}
step "s1-update-copy-table"
{
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
}
step "s1-delete-copy-table"
{
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
}
step "s1-select-copy-table"
{
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
}
step "s1-ddl-copy-table"
{
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
}
step "s1-copy-copy-table"
{
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
}
@ -83,6 +118,13 @@ step "s2-repair-placement"
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
}
step "s2-copy-placement"
{
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
'localhost', 57637, 'localhost', 57638,
do_repair := false, transfer_mode := 'block_writes');
}
step "s2-commit"
{
COMMIT;
@ -93,7 +135,7 @@ step "s2-print-content"
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -105,7 +147,7 @@ step "s2-print-index-count"
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
}
@ -126,3 +168,19 @@ permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-b
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content"
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count"
// verify that copy placement (do_repair := false) blocks other operations, except SELECT
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-update-copy-table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-delete-copy-table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-insert-copy-table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-copy-copy-table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-ddl-copy-table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-select-copy-table" "s2-commit" "s1-commit"
// verify that copy placement (do_repair := false) is blocked by other operations, except SELECT
permutation "s1-begin" "s2-begin" "s1-update-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-delete-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-insert-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-copy-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-ddl-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-select-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"

View File

@ -3,7 +3,10 @@ CREATE SCHEMA mcsp;
SET search_path TO mcsp;
SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replicatiOn_model TO 'statement';
SET citus.replication_model TO 'statement';
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
CREATE TABLE data (
key text primary key,
@ -28,6 +31,27 @@ INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
-- verify we error out if no healthy placement exists at source
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
-- verify we error out if source and destination are the same
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_2_port,
do_repair := false);
-- verify we error out if target already contains a healthy placement
SELECT master_copy_shard_placement(
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
@ -46,4 +70,22 @@ WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nod
SELECT count(*) FROM data;
SELECT count(*) FROM history;
-- test we can not replicate MX tables
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
CREATE TABLE mx_table(a int);
SELECT create_distributed_table('mx_table', 'a');
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('mx_table', '1'),
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false);
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SET client_min_messages TO ERROR;
DROP SCHEMA mcsp CASCADE;