Ensure metadata is synced on ReplicateColocatedShardPlacement

pull/3742/head
Hadi Moshayedi 2020-04-09 11:32:06 -07:00
parent 2218b7e38d
commit f9de734329
3 changed files with 182 additions and 12 deletions

View File

@ -426,15 +426,28 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
/* finally insert the placements to pg_dist_placement */
/*
* Finally insert the placements to pg_dist_placement and sync it to the
* metadata workers.
*/
foreach_ptr(colocatedShard, colocatedShardList)
{
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
uint64 placementId = GetNextPlacementId();
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
InsertShardPlacementRow(colocatedShardId, placementId,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
if (ShouldSyncTableMetadata(colocatedShard->relationId))
{
char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
SHARD_STATE_ACTIVE, 0,
groupId);
SendCommandToWorkersWithMetadata(placementCommand);
}
}
}

View File

@ -848,7 +848,8 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
(1 row)
SET client_min_messages TO WARNING;
SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset
-- remove reference table replica from worker 2
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
@ -856,7 +857,7 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
-1
@ -871,7 +872,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
0
@ -890,7 +891,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
-1
@ -902,7 +903,7 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
0
@ -914,6 +915,105 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
t
(1 row)
-- test that metadata is synced when master_copy_shard_placement replicates
-- reference table shards
SET citus.replicate_reference_tables_on_activate TO off;
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET citus.replication_model TO streaming;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT master_copy_shard_placement(
:ref_table_shard,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false,
transfer_mode := 'block_writes');
master_copy_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
?column?
---------------------------------------------------------------------
0
(1 row)
-- test that metadata is synced on replicate_reference_tables
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT replicate_reference_tables();
replicate_reference_tables
---------------------------------------------------------------------
(1 row)
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
?column?
---------------------------------------------------------------------
0
(1 row)
-- join the reference table with a distributed table from worker 1
-- to verify that metadata for worker 2 placements have been synced
-- to worker 1.
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i;
TRUNCATE ref_table;
INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i;
\c - - - :worker_1_port
SET search_path TO replicate_reference_table;
SELECT array_agg(dist_table.b ORDER BY ref_table.a)
FROM ref_table, dist_table
WHERE ref_table.a = dist_table.a;
array_agg
---------------------------------------------------------------------
{4,16,36,64,100}
(1 row)
\c - - - :master_port
SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings

View File

@ -551,32 +551,89 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
SET client_min_messages TO WARNING;
SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset
-- remove reference table replica from worker 2
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
-- test setting citus.replicate_reference_tables_on_activate to on
-- master_add_node
SET citus.replicate_reference_tables_on_activate TO on;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
-- master_activate_node
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('ref_table', 'SELECT sum(a) FROM %s');
-- test that metadata is synced when master_copy_shard_placement replicates
-- reference table shards
SET citus.replicate_reference_tables_on_activate TO off;
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SET citus.replication_model TO streaming;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT master_copy_shard_placement(
:ref_table_shard,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false,
transfer_mode := 'block_writes');
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
-- test that metadata is synced on replicate_reference_tables
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT replicate_reference_tables();
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
-- join the reference table with a distributed table from worker 1
-- to verify that metadata for worker 2 placements have been synced
-- to worker 1.
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i;
TRUNCATE ref_table;
INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i;
\c - - - :worker_1_port
SET search_path TO replicate_reference_table;
SELECT array_agg(dist_table.b ORDER BY ref_table.a)
FROM ref_table, dist_table
WHERE ref_table.a = dist_table.a;
\c - - - :master_port
SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
-- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings