Refactor ReplicateColocatedShardPlacement

pull/3742/head
Hadi Moshayedi 2020-04-09 11:31:35 -07:00
parent 2b2a146af4
commit 2218b7e38d
1 changed files with 95 additions and 82 deletions

View File

@ -53,7 +53,7 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName
int32 sourceNodePort, char *targetNodeName, int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, int32 targetNodePort,
char shardReplicationMode); char shardReplicationMode);
static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName, int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort); int32 targetNodePort);
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
@ -67,6 +67,8 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
int32 targetNodePort); int32 targetNodePort);
static List * RecreateTableDDLCommandList(Oid relationId); static List * RecreateTableDDLCommandList(Oid relationId);
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
static void EnsureTableListOwner(List *tableIdList);
static void EnsureTableListSuitableForReplication(List *tableIdList);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(master_copy_shard_placement);
@ -383,57 +385,21 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
List *colocatedTableList = ColocatedTableList(distributedTableId); List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval); List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
Oid colocatedTableId = InvalidOid;
ListCell *colocatedShardCell = NULL;
foreach_oid(colocatedTableId, colocatedTableList) EnsureTableListOwner(colocatedTableList);
{ EnsureTableListSuitableForReplication(colocatedTableList);
char relationKind = '\0';
/* check that user has owner rights in all co-located tables */
EnsureTableOwner(colocatedTableId);
relationKind = get_rel_relkind(colocatedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate shard"),
errdetail("Table %s is a foreign table. Replicating "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
colocatedTableId);
if (foreignConstraintCommandList != NIL &&
PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("This shard has foreign constraints on it. "
"Citus currently supports "
"foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with "
"other replication factors, please contact us at "
"https://citusdata.com/about/contact_us.")));
}
}
/* /*
* We sort colocatedShardList so that lock operations will not cause any * We sort shardIntervalList so that lock operations will not cause any
* deadlocks. * deadlocks.
*/ */
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
BlockWritesToShardList(colocatedShardList); BlockWritesToShardList(colocatedShardList);
foreach(colocatedShardCell, colocatedShardList) ShardInterval *colocatedShard = NULL;
foreach_ptr(colocatedShard, colocatedShardList)
{ {
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId; uint64 colocatedShardId = colocatedShard->shardId;
/* /*
@ -457,12 +423,76 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
EnsureReferenceTablesExistOnAllNodes(); EnsureReferenceTablesExistOnAllNodes();
} }
/* CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.
*/
CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort); targetNodeName, targetNodePort);
/* finally insert the placements to pg_dist_placement */
foreach_ptr(colocatedShard, colocatedShardList)
{
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
}
}
/*
* EnsureTableListOwner ensures current user owns given tables. Superusers
* are regarded as owners.
*/
static void
EnsureTableListOwner(List *tableIdList)
{
Oid tableId = InvalidOid;
foreach_oid(tableId, tableIdList)
{
EnsureTableOwner(tableId);
}
}
/*
* EnsureTableListSuitableForReplication errors out if given tables are not
* suitable for replication.
*/
static void
EnsureTableListSuitableForReplication(List *tableIdList)
{
Oid tableId = InvalidOid;
foreach_oid(tableId, tableIdList)
{
char relationKind = get_rel_relkind(tableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(tableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate shard"),
errdetail("Table %s is a foreign table. Replicating "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
List *foreignConstraintCommandList =
GetTableForeignConstraintCommands(tableId);
if (foreignConstraintCommandList != NIL &&
PartitionMethod(tableId) != DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("This shard has foreign constraints on it. "
"Citus currently supports "
"foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with "
"other replication factors, please contact us at "
"https://citusdata.com/about/contact_us.")));
}
}
} }
@ -473,28 +503,25 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
* necessary. * necessary.
*/ */
static void static void
CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort) char *targetNodeName, int32 targetNodePort)
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = NULL;
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
/* iterate through the colocated shards and copy each */ /* iterate through the colocated shards and copy each */
foreach(colocatedShardCell, colocatedShardList) foreach_ptr(shardInterval, shardIntervalList)
{ {
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
bool includeDataCopy = true; bool includeDataCopy = true;
if (PartitionedTable(colocatedShard->relationId)) if (PartitionedTable(shardInterval->relationId))
{ {
/* partitioned tables contain no data */ /* partitioned tables contain no data */
includeDataCopy = false; includeDataCopy = false;
} }
List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName, List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName,
sourceNodePort, includeDataCopy); sourceNodePort, includeDataCopy);
char *tableOwner = TableOwner(colocatedShard->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
@ -517,25 +544,23 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod
* the copied shard would get updated twice because of a cascading DML coming * the copied shard would get updated twice because of a cascading DML coming
* from both of the placements. * from both of the placements.
*/ */
foreach(colocatedShardCell, colocatedShardList) foreach_ptr(shardInterval, shardIntervalList)
{ {
List *colocatedShardForeignConstraintCommandList = NIL; List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL; List *referenceTableForeignConstraintList = NIL;
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); char *tableOwner = TableOwner(shardInterval->relationId);
char *tableOwner = TableOwner(colocatedShard->relationId);
CopyShardForeignConstraintCommandListGrouped(colocatedShard, CopyShardForeignConstraintCommandListGrouped(shardInterval,
& &shardForeignConstraintCommandList,
colocatedShardForeignConstraintCommandList,
&referenceTableForeignConstraintList); &referenceTableForeignConstraintList);
List *commandList = list_concat(colocatedShardForeignConstraintCommandList, List *commandList = list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList); referenceTableForeignConstraintList);
if (PartitionTable(colocatedShard->relationId)) if (PartitionTable(shardInterval->relationId))
{ {
char *attachPartitionCommand = char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(colocatedShard); GenerateAttachShardPartitionCommand(shardInterval);
commandList = lappend(commandList, attachPartitionCommand); commandList = lappend(commandList, attachPartitionCommand);
} }
@ -543,18 +568,6 @@ CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNod
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, commandList); tableOwner, commandList);
} }
/* finally insert the placements to pg_dist_placement */
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
groupId);
}
} }