Implement shard placement copying

pull/3591/head
Marco Slot 2020-03-07 22:24:44 +01:00 committed by Hadi Moshayedi
parent f77c71a9bd
commit ede176d849
6 changed files with 442 additions and 58 deletions

View File

@ -50,15 +50,26 @@
/* local function forward declarations */
static char LookupShardTransferMode(Oid shardReplicationModeOid);
static void ErrorIfTableCannotBeReplicated(Oid relationId);
static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
int32 sourceNodePort, const char *targetNodeName,
int32 targetNodePort);
static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort,
char shardReplicationMode);
static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort);
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
const char *sourceNodeName,
int32 sourceNodePort);
static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName,
int32 sourceNodePort, const char *targetNodeName,
int32 targetNodePort);
static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
int32 sourceNodePort, const char *targetNodeName,
int32 targetNodePort);
static List * RecreateTableDDLCommandList(Oid relationId);
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
@ -87,31 +98,37 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
int32 targetNodePort = PG_GETARG_INT32(4);
bool doRepair = PG_GETARG_BOOL(5);
Oid shardReplicationModeOid = PG_GETARG_OID(6);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *targetNodeName = text_to_cstring(targetNodeNameText);
CheckCitusVersion(ERROR);
EnsureCoordinator();
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("using logical replication in "
"master_copy_shard_placement() requires Citus "
"Enterprise")));
}
ShardInterval *shardInterval = LoadShardInterval(shardId);
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
if (!doRepair)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("master_copy_shard_placement() "
"with do not repair functionality "
"is only supported on Citus Enterprise")));
ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode);
}
else if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("using logical replication with repair functionality "
"is currently not supported")));
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
/* RepairShardPlacement function repairs only given shard */
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort);
}
PG_RETURN_VOID();
}
@ -171,6 +188,47 @@ BlockWritesToShardList(List *shardList)
}
/*
* ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable
* for its shard being replicated. There are 2 cases in which shard replication is not
* allowed:
*
* 1) MX tables, since RF=1 is a must MX tables
* 2) Reference tables, since the shard should already exist in all workers
*/
static void
ErrorIfTableCannotBeReplicated(Oid relationId)
{
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (shouldSyncMetadata)
{
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
char *relationName = get_rel_name(relationId);
StringInfo errorDetailString = makeStringInfo();
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
appendStringInfo(errorDetailString,
"Table %s is streaming replicated. Shards "
"of streaming replicated tables cannot "
"be copied", relationName);
}
else if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
appendStringInfo(errorDetailString, "Table %s is a reference table. Shards "
"of reference tables cannot be copied",
relationName);
return;
}
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy shard"),
errdetail("%s", errorDetailString->data)));
}
}
/*
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
* values to a char.
@ -217,6 +275,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
char relationKind = get_rel_relkind(distributedTableId);
char *tableOwner = TableOwner(shardInterval->relationId);
bool missingOk = false;
/* prevent table from being dropped */
@ -314,13 +373,190 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
/* after successful repair, we update shard state as healthy*/
List *placementList = ShardPlacementList(shardId);
ShardPlacement *placement = ForceSearchShardPlacementInList(placementList,
targetNodeName,
targetNodePort);
ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName,
targetNodePort,
missingOk);
UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
}
/*
* ReplicateColocatedShardPlacement replicated given shard and its colocated shards
* from a source node to target node.
*/
static void
ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, char shardReplicationMode)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid distributedTableId = shardInterval->relationId;
List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedTableCell = NULL;
ListCell *colocatedShardCell = NULL;
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
char relationKind = '\0';
/* check that user has owner rights in all co-located tables */
EnsureTableOwner(colocatedTableId);
relationKind = get_rel_relkind(colocatedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot repair shard"),
errdetail("Table %s is a foreign table. Repairing "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
colocatedTableId);
if (foreignConstraintCommandList != NIL &&
PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("This shard has foreign constraints on it. "
"Citus currently supports "
"foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with "
"other replication factors, please contact us at "
"https://citusdata.com/about/contact_us.")));
}
}
/*
* We sort colocatedShardList so that lock operations will not cause any
* deadlocks.
*/
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
/*
* For shard copy, there should be healthy placement in source node and no
* placement in the target node.
*/
EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
}
BlockWritesToShardList(colocatedShardList);
/*
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.
*/
CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
}
/*
* CopyColocatedShardPlacement copies a shard along with its co-located shards from a
* source node to target node. CopyShardPlacement does not make any checks about state
* of the shards. It is caller's responsibility to make those checks if they are
* necessary.
*/
static void
CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
/* iterate through the colocated shards and copy each */
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
bool includeDataCopy = true;
if (PartitionedTable(colocatedShard->relationId))
{
/* partitioned tables contain no data */
includeDataCopy = false;
}
List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName,
sourceNodePort, includeDataCopy);
char *tableOwner = TableOwner(colocatedShard->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}
/*
* Once all shards are created, we can recreate relationships between shards.
*
* Iterate through the colocated shards and create the foreign constraints and
* attach child tables to their parents in a partitioning hierarchy.
*
* Note: After implementing foreign constraints from distributed to reference
* tables, we have decided to not create foreign constraints from hash
* distributed to reference tables at this stage for nonblocking rebalancer.
* We just create the co-located ones here. We add the foreign constraints
* from hash distributed to reference tables after being completely done with
* the copy procedure inside LogicallyReplicateShards. The reason is that,
* the reference tables have placements in both source and target workers and
* the copied shard would get updated twice because of a cascading DML coming
* from both of the placements.
*/
foreach(colocatedShardCell, colocatedShardList)
{
List *colocatedShardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
char *tableOwner = TableOwner(colocatedShard->relationId);
CopyShardForeignConstraintCommandListGrouped(colocatedShard,
&
colocatedShardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
List *commandList = list_concat(colocatedShardForeignConstraintCommandList,
referenceTableForeignConstraintList);
if (PartitionTable(colocatedShard->relationId))
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(colocatedShard);
commandList = lappend(commandList, attachPartitionCommand);
}
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, commandList);
}
/* finally insert the placements to pg_dist_placement */
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
}
}
/*
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
* belongs to partitioned table (this is asserted).
@ -369,19 +605,23 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
const char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
bool missingSourceOk = false;
bool missingTargetOk = false;
ShardPlacement *sourcePlacement = ForceSearchShardPlacementInList(shardPlacementList,
ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList,
sourceNodeName,
sourceNodePort);
sourceNodePort,
missingSourceOk);
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("source placement must be in active state")));
}
ShardPlacement *targetPlacement = ForceSearchShardPlacementInList(shardPlacementList,
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort);
targetNodePort,
missingTargetOk);
if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -391,46 +631,77 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
/*
* SearchShardPlacementInList searches a provided list for a shard placement with the
* specified node name and port. This function returns NULL if no such
* placement exists in the provided list.
* EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source
* node and no placements in the target node.
*/
ShardPlacement *
SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
uint32 nodePort)
static void
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
const char *targetNodeName, int32 targetNodePort)
{
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
List *shardPlacementList = ShardPlacementList(shardId);
bool missingSourceOk = false;
bool missingTargetOk = true;
ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList,
sourceNodeName,
sourceNodePort,
missingSourceOk);
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
{
if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
nodePort == shardPlacement->nodePort)
{
return shardPlacement;
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("source placement must be in active state")));
}
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort,
missingTargetOk);
if (targetPlacement != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("shard %ld already exist in target placement", shardId)));
}
return NULL;
}
/*
* ForceSearchShardPlacementInList searches a provided list for a shard
* placement with the specified node name and port. This function throws an
* error if no such placement exists in the provided list.
* SearchShardPlacementInList searches a provided list for a shard placement with the
* specified node name and port. If missingOk is set to true, this function returns NULL
* if no such placement exists in the provided list, otherwise it throws an error.
*/
ShardPlacement *
ForceSearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
uint32 nodePort)
SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
uint32 nodePort, bool missingOk)
{
ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName,
nodePort);
if (placement == NULL)
ListCell *shardPlacementCell = NULL;
ShardPlacement *matchingPlacement = NULL;
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = lfirst(shardPlacementCell);
if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
nodePort == shardPlacement->nodePort)
{
matchingPlacement = shardPlacement;
break;
}
}
if (matchingPlacement == NULL)
{
if (missingOk)
{
return NULL;
}
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("could not find placement matching \"%s:%d\"",
nodeName, nodePort),
errhint("Confirm the placement still exists and try again.")));
}
return placement;
return matchingPlacement;
}
@ -456,10 +727,6 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
tableRecreationCommandList);
/*
* The caller doesn't want to include the COPY command, perhaps using
* logical replication to copy the data.
*/
if (includeDataCopy)
{
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,

View File

@ -270,8 +270,10 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
List *shardPlacementList = ShardPlacementList(shardId);
bool missingWorkerOk = true;
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort);
nodeName, nodePort,
missingWorkerOk);
char *tableOwner = TableOwner(shardInterval->relationId);
/*

View File

@ -165,9 +165,7 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt
List **
referenceTableForeignConstraintList);
extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
const char *nodeName, uint32 nodePort);
extern ShardPlacement * ForceSearchShardPlacementInList(List *shardPlacementList,
const char *nodeName,
uint32 nodePort);
const char *nodeName, uint32 nodePort,
bool missingOk);
#endif /* MASTER_PROTOCOL_H */

View File

@ -0,0 +1,68 @@
-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication
CREATE SCHEMA mcsp;
SET search_path TO mcsp;
SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replicatiOn_model TO 'statement';
CREATE TABLE data (
key text primary key,
value text not null,
check (value <> '')
);
CREATE INDEX ON data (value);
SELECT create_distributed_table('data','key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE history (
key text not null,
t timestamptz not null,
value text not null
) PARTITION BY RANGE (t);
CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
SELECT create_distributed_table('history','key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO data VALUES ('key-1', 'value-1');
INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
do_repair := false);
master_copy_shard_placement
---------------------------------------------------------------------
(1 row)
-- forcefully mark the old replica as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port;
-- should still have all data available thanks to new replica
SELECT count(*) FROM data;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM history;
count
---------------------------------------------------------------------
2
(1 row)
DROP SCHEMA mcsp CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table data
drop cascades to table history

View File

@ -200,7 +200,7 @@ test: multi_complex_count_distinct multi_select_distinct
test: multi_modifications
test: multi_distribution_metadata
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
test: multi_upsert multi_simple_queries multi_data_types
test: multi_upsert multi_simple_queries multi_data_types master_copy_shard_placement
# multi_utilities cannot be run in parallel with other tests because it checks
# global locks
test: multi_utilities

View File

@ -0,0 +1,49 @@
-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication
CREATE SCHEMA mcsp;
SET search_path TO mcsp;
SET citus.next_shard_id TO 8139000;
SET citus.shard_replication_factor TO 1;
SET citus.replicatiOn_model TO 'statement';
CREATE TABLE data (
key text primary key,
value text not null,
check (value <> '')
);
CREATE INDEX ON data (value);
SELECT create_distributed_table('data','key');
CREATE TABLE history (
key text not null,
t timestamptz not null,
value text not null
) PARTITION BY RANGE (t);
CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
SELECT create_distributed_table('history','key');
INSERT INTO data VALUES ('key-1', 'value-1');
INSERT INTO data VALUES ('key-2', 'value-2');
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
-- replicate shard that contains key-1
SELECT master_copy_shard_placement(
get_shard_id_for_distribution_column('data', 'key-1'),
'localhost', :worker_2_port,
'localhost', :worker_1_port,
do_repair := false);
-- forcefully mark the old replica as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port;
-- should still have all data available thanks to new replica
SELECT count(*) FROM data;
SELECT count(*) FROM history;
DROP SCHEMA mcsp CASCADE;