Merge pull request #3742 from citusdata/fix_sync

Ensure metadata is synced on master_copy_shard_placement(..., do_repair := false)
pull/3744/head
Hadi Moshayedi 2020-04-13 12:57:11 -07:00 committed by GitHub
commit 4e3d402473
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 299 additions and 92 deletions

View File

@ -53,7 +53,7 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort,
char shardReplicationMode);
static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName,
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort);
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
@ -67,6 +67,8 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
int32 targetNodePort);
static List * RecreateTableDDLCommandList(Oid relationId);
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
static void EnsureTableListOwner(List *tableIdList);
static void EnsureTableListSuitableForReplication(List *tableIdList);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
@ -383,57 +385,21 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
Oid colocatedTableId = InvalidOid;
ListCell *colocatedShardCell = NULL;
foreach_oid(colocatedTableId, colocatedTableList)
{
char relationKind = '\0';
/* check that user has owner rights in all co-located tables */
EnsureTableOwner(colocatedTableId);
relationKind = get_rel_relkind(colocatedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate shard"),
errdetail("Table %s is a foreign table. Replicating "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
colocatedTableId);
if (foreignConstraintCommandList != NIL &&
PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("This shard has foreign constraints on it. "
"Citus currently supports "
"foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with "
"other replication factors, please contact us at "
"https://citusdata.com/about/contact_us.")));
}
}
EnsureTableListOwner(colocatedTableList);
EnsureTableListSuitableForReplication(colocatedTableList);
/*
* We sort colocatedShardList so that lock operations will not cause any
* We sort shardIntervalList so that lock operations will not cause any
* deadlocks.
*/
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
BlockWritesToShardList(colocatedShardList);
foreach(colocatedShardCell, colocatedShardList)
ShardInterval *colocatedShard = NULL;
foreach_ptr(colocatedShard, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
/*
@ -457,12 +423,89 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
EnsureReferenceTablesExistOnAllNodes();
}
/*
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.
*/
CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
/*
* Finally insert the placements to pg_dist_placement and sync it to the
* metadata workers.
*/
foreach_ptr(colocatedShard, colocatedShardList)
{
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
uint64 placementId = GetNextPlacementId();
InsertShardPlacementRow(colocatedShardId, placementId,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
if (ShouldSyncTableMetadata(colocatedShard->relationId))
{
char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
SHARD_STATE_ACTIVE, 0,
groupId);
SendCommandToWorkersWithMetadata(placementCommand);
}
}
}
/*
* EnsureTableListOwner ensures current user owns given tables. Superusers
* are regarded as owners.
*/
static void
EnsureTableListOwner(List *tableIdList)
{
Oid tableId = InvalidOid;
foreach_oid(tableId, tableIdList)
{
EnsureTableOwner(tableId);
}
}
/*
* EnsureTableListSuitableForReplication errors out if given tables are not
* suitable for replication.
*/
static void
EnsureTableListSuitableForReplication(List *tableIdList)
{
Oid tableId = InvalidOid;
foreach_oid(tableId, tableIdList)
{
char relationKind = get_rel_relkind(tableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(tableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate shard"),
errdetail("Table %s is a foreign table. Replicating "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
List *foreignConstraintCommandList =
GetTableForeignConstraintCommands(tableId);
if (foreignConstraintCommandList != NIL &&
PartitionMethod(tableId) != DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("This shard has foreign constraints on it. "
"Citus currently supports "
"foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with "
"other replication factors, please contact us at "
"https://citusdata.com/about/contact_us.")));
}
}
}
@ -473,28 +516,25 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
* necessary.
*/
static void
CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
ShardInterval *shardInterval = NULL;
/* iterate through the colocated shards and copy each */
foreach(colocatedShardCell, colocatedShardList)
foreach_ptr(shardInterval, shardIntervalList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
bool includeDataCopy = true;
if (PartitionedTable(colocatedShard->relationId))
if (PartitionedTable(shardInterval->relationId))
{
/* partitioned tables contain no data */
includeDataCopy = false;
}
List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName,
List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName,
sourceNodePort, includeDataCopy);
char *tableOwner = TableOwner(colocatedShard->relationId);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
@ -517,25 +557,23 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod
* the copied shard would get updated twice because of a cascading DML coming
* from both of the placements.
*/
foreach(colocatedShardCell, colocatedShardList)
foreach_ptr(shardInterval, shardIntervalList)
{
List *colocatedShardForeignConstraintCommandList = NIL;
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
char *tableOwner = TableOwner(colocatedShard->relationId);
char *tableOwner = TableOwner(shardInterval->relationId);
CopyShardForeignConstraintCommandListGrouped(colocatedShard,
&
colocatedShardForeignConstraintCommandList,
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
List *commandList = list_concat(colocatedShardForeignConstraintCommandList,
List *commandList = list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList);
if (PartitionTable(colocatedShard->relationId))
if (PartitionTable(shardInterval->relationId))
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(colocatedShard);
GenerateAttachShardPartitionCommand(shardInterval);
commandList = lappend(commandList, attachPartitionCommand);
}
@ -543,18 +581,6 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, commandList);
}
/* finally insert the placements to pg_dist_placement */
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
}
}

View File

@ -4,7 +4,7 @@ SET search_path TO mcsp;
SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'statement';
CREATE TABLE ref_table(a int);
CREATE TABLE ref_table(a int, b text unique);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
@ -62,6 +62,18 @@ SELECT master_copy_shard_placement(
'localhost', :worker_2_port,
do_repair := false);
ERROR: shard xxxxx already exists in the target node
-- verify we error out if table has foreign key constraints
INSERT INTO ref_table SELECT 1, value FROM data;
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
do_repair := false);
ERROR: cannot create foreign key constraint
DETAIL: This shard has foreign constraints on it. Citus currently supports foreign key constraints only for "citus.shard_replication_factor = 1".
HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us.
ALTER TABLE data DROP CONSTRAINT distfk;
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),

View File

@ -848,7 +848,8 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
(1 row)
SET client_min_messages TO WARNING;
SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset
-- remove reference table replica from worker 2
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
@ -856,7 +857,7 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
-1
@ -871,7 +872,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
0
@ -890,7 +891,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
-1
@ -902,7 +903,7 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
1
(1 row)
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
?column?
---------------------------------------------------------------------
0
@ -914,6 +915,105 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
t
(1 row)
-- test that metadata is synced when master_copy_shard_placement replicates
-- reference table shards
SET citus.replicate_reference_tables_on_activate TO off;
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET citus.replication_model TO streaming;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT master_copy_shard_placement(
:ref_table_shard,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false,
transfer_mode := 'block_writes');
master_copy_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
?column?
---------------------------------------------------------------------
0
(1 row)
-- test that metadata is synced on replicate_reference_tables
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT replicate_reference_tables();
replicate_reference_tables
---------------------------------------------------------------------
(1 row)
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
?column?
---------------------------------------------------------------------
0
(1 row)
-- join the reference table with a distributed table from worker 1
-- to verify that metadata for worker 2 placements have been synced
-- to worker 1.
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i;
TRUNCATE ref_table;
INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i;
\c - - - :worker_1_port
SET search_path TO replicate_reference_table;
SELECT array_agg(dist_table.b ORDER BY ref_table.a)
FROM ref_table, dist_table
WHERE ref_table.a = dist_table.a;
array_agg
---------------------------------------------------------------------
{4,16,36,64,100}
(1 row)
\c - - - :master_port
SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings

View File

@ -5,7 +5,7 @@ SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'statement';
CREATE TABLE ref_table(a int);
CREATE TABLE ref_table(a int, b text unique);
SELECT create_reference_table('ref_table');
CREATE TABLE data (
@ -52,6 +52,18 @@ SELECT master_copy_shard_placement(
'localhost', :worker_2_port,
do_repair := false);
-- verify we error out if table has foreign key constraints
INSERT INTO ref_table SELECT 1, value FROM data;
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
do_repair := false);
ALTER TABLE data DROP CONSTRAINT distfk;
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),

View File

@ -551,32 +551,89 @@ SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('r
SET client_min_messages TO WARNING;
SELECT count(*) AS ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT shardid AS ref_table_shard FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass \gset
SELECT count(*) AS ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard \gset
-- remove reference table replica from worker 2
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
-- test setting citus.replicate_reference_tables_on_activate to on
-- master_add_node
SET citus.replicate_reference_tables_on_activate TO on;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
-- master_activate_node
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
SELECT count(*) - :ref_table_placements FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass;
SELECT count(*) - :ref_table_placements FROM pg_dist_shard_placement WHERE shardid = :ref_table_shard;
SELECT min(result) = max(result) AS consistent FROM run_command_on_placements('ref_table', 'SELECT sum(a) FROM %s');
-- test that metadata is synced when master_copy_shard_placement replicates
-- reference table shards
SET citus.replicate_reference_tables_on_activate TO off;
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SET citus.replication_model TO streaming;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT master_copy_shard_placement(
:ref_table_shard,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
do_repair := false,
transfer_mode := 'block_writes');
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
-- test that metadata is synced on replicate_reference_tables
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT replicate_reference_tables();
SELECT result::int - :ref_table_placements
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
WHERE nodeport=:worker_1_port;
-- join the reference table with a distributed table from worker 1
-- to verify that metadata for worker 2 placements have been synced
-- to worker 1.
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT i, i * i FROM generate_series(1, 20) i;
TRUNCATE ref_table;
INSERT INTO ref_table SELECT 2 * i FROM generate_series(1, 5) i;
\c - - - :worker_1_port
SET search_path TO replicate_reference_table;
SELECT array_agg(dist_table.b ORDER BY ref_table.a)
FROM ref_table, dist_table
WHERE ref_table.a = dist_table.a;
\c - - - :master_port
SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
-- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings