mirror of https://github.com/citusdata/citus.git
Merge pull request #908 from citusdata/only_repair_given_shard
Only repair given shardpull/900/head
commit
61f6baf9e3
|
@ -39,17 +39,15 @@
|
|||
|
||||
|
||||
/* local function forward declarations */
|
||||
static void RepairShardPlacement(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort);
|
||||
static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort);
|
||||
static void EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort);
|
||||
static ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
||||
char *nodeName, uint32 nodePort,
|
||||
bool missingOk);
|
||||
static void CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort,
|
||||
bool doRepair);
|
||||
static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName,
|
||||
int32 sourceNodePort);
|
||||
static List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval);
|
||||
|
@ -79,87 +77,13 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
int32 sourceNodePort = PG_GETARG_INT32(2);
|
||||
text *targetNodeNameText = PG_GETARG_TEXT_P(3);
|
||||
int32 targetNodePort = PG_GETARG_INT32(4);
|
||||
bool doRepair = true;
|
||||
|
||||
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
|
||||
char *targetNodeName = text_to_cstring(targetNodeNameText);
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||
ListCell *colocatedTableCell = NULL;
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
|
||||
foreach(colocatedTableCell, colocatedTableList)
|
||||
{
|
||||
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
||||
char relationKind = '\0';
|
||||
|
||||
/* check that user has owner rights in all co-located tables */
|
||||
EnsureTableOwner(colocatedTableId);
|
||||
|
||||
relationKind = get_rel_relkind(colocatedTableId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot repair shard"),
|
||||
errdetail("Table %s is a foreign table. Repairing "
|
||||
"shards backed by foreign tables is "
|
||||
"not supported.", relationName)));
|
||||
}
|
||||
}
|
||||
|
||||
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
|
||||
/*
|
||||
* We've stopped data modifications of this shard, but we plan to move
|
||||
* a placement to the healthy state, so we need to grab a shard metadata
|
||||
* lock (in exclusive mode) as well.
|
||||
*/
|
||||
LockShardDistributionMetadata(colocatedShardId, ExclusiveLock);
|
||||
|
||||
/*
|
||||
* If our aim is repairing, we should be sure that there is an unhealthy
|
||||
* placement in target node. We use EnsureShardCanBeRepaired function
|
||||
* to be sure that there is an unhealthy placement in target node. If
|
||||
* we just want to copy the shard without any repair, it is enough to use
|
||||
* EnsureShardCanBeCopied which just checks there is a placement in source
|
||||
* and no placement in target node.
|
||||
*/
|
||||
if (doRepair)
|
||||
{
|
||||
/*
|
||||
* After #810 is fixed, we should remove this check and call EnsureShardCanBeRepaired
|
||||
* for all shard ids
|
||||
*/
|
||||
if (colocatedShardId == shardId)
|
||||
{
|
||||
EnsureShardCanBeRepaired(colocatedShardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
}
|
||||
else
|
||||
{
|
||||
EnsureShardCanBeMoved(colocatedShardId, sourceNodeName, sourceNodePort);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("master_copy_shard_placement() without repair option "
|
||||
"is only supported on Citus Enterprise")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* CopyShardPlacement function copies given shard with its co-located shards */
|
||||
CopyShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort, doRepair);
|
||||
/* RepairShardPlacement function repairs only given shard */
|
||||
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -178,6 +102,66 @@ master_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RepairShardPlacement repairs given shard from a source node to target node.
|
||||
* This function is not co-location aware. It only repairs given shard.
|
||||
*/
|
||||
static void
|
||||
RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort)
|
||||
{
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
char relationKind = get_rel_relkind(distributedTableId);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
bool missingOk = false;
|
||||
|
||||
List *ddlCommandList = NIL;
|
||||
List *foreignConstraintCommandList = NIL;
|
||||
List *placementList = NIL;
|
||||
ShardPlacement *placement = NULL;
|
||||
|
||||
EnsureTableOwner(distributedTableId);
|
||||
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *relationName = get_rel_name(distributedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot repair shard"),
|
||||
errdetail("Table %s is a foreign table. Repairing "
|
||||
"shards backed by foreign tables is "
|
||||
"not supported.", relationName)));
|
||||
}
|
||||
|
||||
/*
|
||||
* We plan to move the placement to the healthy state, so we need to grab a shard
|
||||
* metadata lock (in exclusive mode).
|
||||
*/
|
||||
LockShardDistributionMetadata(shardId, ExclusiveLock);
|
||||
|
||||
/*
|
||||
* For shard repair, there should be healthy placement in source node and unhealthy
|
||||
* placement in the target node.
|
||||
*/
|
||||
EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort);
|
||||
|
||||
/* we generate necessary commands to recreate the shard in target node */
|
||||
ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort);
|
||||
foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval);
|
||||
ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList);
|
||||
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner,
|
||||
ddlCommandList);
|
||||
|
||||
/* after successful repair, we update shard state as healthy*/
|
||||
placementList = ShardPlacementList(shardId);
|
||||
placement = SearchShardPlacementInList(placementList, targetNodeName, targetNodePort,
|
||||
missingOk);
|
||||
UpdateShardPlacementState(placement->placementId, FILE_FINALIZED);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source
|
||||
* node and inactive node on the target node.
|
||||
|
@ -210,24 +194,6 @@ EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePo
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureShardCanBeMoved checks if the given shard has a placement in the source node but
|
||||
* not on the target node. It is important to note that SearchShardPlacementInList
|
||||
* function already generates error if given shard does not have a placement in the
|
||||
* source node. Therefore we do not perform extra check.
|
||||
*/
|
||||
static void
|
||||
EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, int32 sourceNodePort)
|
||||
{
|
||||
List *shardPlacementList = ShardPlacementList(shardId);
|
||||
bool missingSourceOk = false;
|
||||
|
||||
/* Actual check is done in SearchShardPlacementInList */
|
||||
SearchShardPlacementInList(shardPlacementList, sourceNodeName, sourceNodePort,
|
||||
missingSourceOk);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SearchShardPlacementInList searches a provided list for a shard placement with the
|
||||
* specified node name and port. If missingOk is set to true, this function returns NULL
|
||||
|
@ -269,74 +235,6 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyShardPlacement copies a shard along with its co-located shards from a source node
|
||||
* to target node. CopyShardPlacement does not make any checks about state of the shards.
|
||||
* It is caller's responsibility to make those checks if they are necessary.
|
||||
*/
|
||||
static void
|
||||
CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort, bool doRepair)
|
||||
{
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName,
|
||||
sourceNodePort);
|
||||
char *tableOwner = TableOwner(colocatedShard->relationId);
|
||||
|
||||
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner, ddlCommandList);
|
||||
}
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
|
||||
colocatedShard);
|
||||
char *tableOwner = TableOwner(colocatedShard->relationId);
|
||||
|
||||
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner,
|
||||
foreignConstraintCommandList);
|
||||
}
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
|
||||
/*
|
||||
* If we call this function for repair purposes, the caller should have
|
||||
* removed the old shard placement metadata.
|
||||
*/
|
||||
if (doRepair)
|
||||
{
|
||||
List *shardPlacementList = ShardPlacementList(colocatedShardId);
|
||||
bool missingSourceOk = false;
|
||||
|
||||
ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList,
|
||||
targetNodeName,
|
||||
targetNodePort,
|
||||
missingSourceOk);
|
||||
|
||||
UpdateShardPlacementState(placement->placementId, FILE_FINALIZED);
|
||||
}
|
||||
else
|
||||
{
|
||||
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
|
||||
FILE_FINALIZED, ShardLength(colocatedShardId),
|
||||
targetNodeName,
|
||||
targetNodePort);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyShardCommandList generates command list to copy the given shard placement
|
||||
* from the source node to the target node.
|
||||
|
|
|
@ -63,7 +63,7 @@ ORDER BY s.shardid;
|
|||
1300002 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 3
|
||||
1300004 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57638 | 1000 | 1
|
||||
|
@ -187,7 +187,7 @@ ORDER BY s.shardid;
|
|||
1300002 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 3
|
||||
1300004 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57638 | 1000 | 1
|
||||
|
@ -219,7 +219,7 @@ ORDER BY s.shardid;
|
|||
1300002 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57637 | 1000 | 1
|
||||
1300003 | table1_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 1
|
||||
1300004 | table2_group1 | 57638 | 1000 | 3
|
||||
1300004 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57637 | 1000 | 1
|
||||
1300005 | table2_group1 | 57638 | 1000 | 1
|
||||
|
|
Loading…
Reference in New Issue