mirror of https://github.com/citusdata/citus.git
Make shard moves more idempotent (#6313)
Co-authored-by: Marco Slot <marco.slot@gmail.com>pull/6319/head
parent
a2d86214b2
commit
2e943a64a0
|
@ -75,6 +75,9 @@ static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode);
|
||||||
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
||||||
static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName,
|
static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName,
|
||||||
int targetNodePort);
|
int targetNodePort);
|
||||||
|
static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
|
||||||
|
char *targetNodeName, int targetNodePort,
|
||||||
|
const char *operationName);
|
||||||
static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
int32 sourceNodePort, char *targetNodeName,
|
int32 sourceNodePort, char *targetNodeName,
|
||||||
int32 targetNodePort,
|
int32 targetNodePort,
|
||||||
|
@ -107,6 +110,8 @@ static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
|
||||||
int32 sourceNodePort,
|
int32 sourceNodePort,
|
||||||
char *targetNodeName,
|
char *targetNodeName,
|
||||||
int32 targetNodePort);
|
int32 targetNodePort);
|
||||||
|
static bool IsShardListOnNode(List *colocatedShardList, char *targetNodeName,
|
||||||
|
uint32 targetPort);
|
||||||
static void CheckSpaceConstraints(MultiConnection *connection,
|
static void CheckSpaceConstraints(MultiConnection *connection,
|
||||||
uint64 colocationSizeInBytes);
|
uint64 colocationSizeInBytes);
|
||||||
static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
|
static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
|
||||||
|
@ -236,6 +241,10 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
ListCell *colocatedTableCell = NULL;
|
ListCell *colocatedTableCell = NULL;
|
||||||
ListCell *colocatedShardCell = NULL;
|
ListCell *colocatedShardCell = NULL;
|
||||||
|
|
||||||
|
ErrorIfSameNode(sourceNodeName, sourceNodePort,
|
||||||
|
targetNodeName, targetNodePort,
|
||||||
|
"move");
|
||||||
|
|
||||||
Oid relationId = RelationIdForShard(shardId);
|
Oid relationId = RelationIdForShard(shardId);
|
||||||
ErrorIfMoveUnsupportedTableType(relationId);
|
ErrorIfMoveUnsupportedTableType(relationId);
|
||||||
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
||||||
|
@ -276,6 +285,20 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
||||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If there are no active placements on the source and only active placements on
|
||||||
|
* the target node, we assume the copy to already be done.
|
||||||
|
*/
|
||||||
|
if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||||
|
!IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
|
||||||
|
targetNodeName, targetNodePort),
|
||||||
|
errdetail("Move may have already completed.")));
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
foreach(colocatedShardCell, colocatedShardList)
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
{
|
{
|
||||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
|
@ -390,6 +413,39 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsShardListOnNode determines whether a co-located shard list has
|
||||||
|
* active placements on a given node.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetNodePort)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||||
|
if (workerNode == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("Moving shards to a non-existing node is not supported")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We exhaustively search all co-located shards
|
||||||
|
*/
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
foreach_ptr(shardInterval, colocatedShardList)
|
||||||
|
{
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
List *placementList = ActiveShardPlacementListOnGroup(shardId,
|
||||||
|
workerNode->groupId);
|
||||||
|
if (placementList == NIL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
|
* EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
|
||||||
* shard moves of the given colocated shard list from source node to target node.
|
* shard moves of the given colocated shard list from source node to target node.
|
||||||
|
@ -552,6 +608,25 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfSameNode throws an error if the two host:port combinations
|
||||||
|
* are the same.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
|
||||||
|
char *targetNodeName, int targetNodePort,
|
||||||
|
const char *operationName)
|
||||||
|
{
|
||||||
|
if (strncmp(sourceNodeName, targetNodeName, MAX_NODE_LENGTH) == 0 &&
|
||||||
|
sourceNodePort == targetNodePort)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("cannot %s shard to the same node",
|
||||||
|
operationName)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_move_shard_placement is a wrapper around citus_move_shard_placement.
|
* master_move_shard_placement is a wrapper around citus_move_shard_placement.
|
||||||
*/
|
*/
|
||||||
|
@ -886,6 +961,10 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
|
||||||
|
ErrorIfSameNode(sourceNodeName, sourceNodePort,
|
||||||
|
targetNodeName, targetNodePort,
|
||||||
|
"copy");
|
||||||
|
|
||||||
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
||||||
ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort);
|
ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort);
|
||||||
EnsureNoModificationsHaveBeenDone();
|
EnsureNoModificationsHaveBeenDone();
|
||||||
|
@ -904,6 +983,19 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
*/
|
*/
|
||||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If there are active placements on both nodes, we assume the copy to already
|
||||||
|
* be done.
|
||||||
|
*/
|
||||||
|
if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||||
|
IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
|
||||||
|
targetNodeName, targetNodePort),
|
||||||
|
errdetail("Copy may have already completed.")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point of the shard replication, we don't need to block the writes to
|
* At this point of the shard replication, we don't need to block the writes to
|
||||||
* shards when logical replication is used.
|
* shards when logical replication is used.
|
||||||
|
|
|
@ -56,14 +56,20 @@ SELECT citus_copy_shard_placement(
|
||||||
'localhost', :worker_2_port,
|
'localhost', :worker_2_port,
|
||||||
'localhost', :worker_2_port,
|
'localhost', :worker_2_port,
|
||||||
transfer_mode := 'block_writes');
|
transfer_mode := 'block_writes');
|
||||||
ERROR: shard xxxxx already exists in the target node
|
ERROR: cannot copy shard to the same node
|
||||||
-- verify we error out if target already contains a healthy placement
|
-- verify we warn if target already contains a healthy placement
|
||||||
SELECT citus_copy_shard_placement(
|
SELECT citus_copy_shard_placement(
|
||||||
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
||||||
'localhost', :worker_1_port,
|
'localhost', :worker_1_port,
|
||||||
'localhost', :worker_2_port,
|
'localhost', :worker_2_port,
|
||||||
transfer_mode := 'block_writes');
|
transfer_mode := 'block_writes');
|
||||||
ERROR: shard xxxxx already exists in the target node
|
WARNING: shard is already present on node localhost:xxxxx
|
||||||
|
DETAIL: Copy may have already completed.
|
||||||
|
citus_copy_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- verify we error out if table has foreign key constraints
|
-- verify we error out if table has foreign key constraints
|
||||||
INSERT INTO ref_table SELECT 1, value FROM data;
|
INSERT INTO ref_table SELECT 1, value FROM data;
|
||||||
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
|
ALTER TABLE data ADD CONSTRAINT distfk FOREIGN KEY (value) REFERENCES ref_table (b) MATCH FULL;
|
||||||
|
|
|
@ -143,9 +143,15 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- copy colocated shards again to see error message
|
-- copy colocated shards again to see warning
|
||||||
SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||||
ERROR: shard xxxxx already exists in the target node
|
WARNING: shard is already present on node localhost:xxxxx
|
||||||
|
DETAIL: Copy may have already completed.
|
||||||
|
citus_copy_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- test copying NOT colocated shard
|
-- test copying NOT colocated shard
|
||||||
-- status before shard copy
|
-- status before shard copy
|
||||||
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport
|
||||||
|
@ -300,6 +306,15 @@ ORDER BY s.shardid, sp.nodeport;
|
||||||
13000011 | table2_group1 | 57638
|
13000011 | table2_group1 | 57638
|
||||||
(14 rows)
|
(14 rows)
|
||||||
|
|
||||||
|
-- moving the shard again is idempotent
|
||||||
|
SELECT citus_move_shard_placement(13000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||||
|
WARNING: shard is already present on node localhost:xxxxx
|
||||||
|
DETAIL: Move may have already completed.
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass;
|
||||||
|
@ -412,8 +427,9 @@ ORDER BY s.shardid, sp.nodeport;
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
-- try to move shard from wrong node
|
-- try to move shard from wrong node
|
||||||
SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT master_move_shard_placement(13000021, 'localhost', :master_port, 'localhost', :worker_1_port, 'force_logical');
|
||||||
ERROR: source placement must be in active state
|
ERROR: could not find placement matching "localhost:xxxxx"
|
||||||
|
HINT: Confirm the placement still exists and try again.
|
||||||
-- test shard move with foreign constraints
|
-- test shard move with foreign constraints
|
||||||
DROP TABLE IF EXISTS table1_group1, table2_group1;
|
DROP TABLE IF EXISTS table1_group1, table2_group1;
|
||||||
SET citus.shard_count TO 6;
|
SET citus.shard_count TO 6;
|
||||||
|
|
|
@ -138,20 +138,6 @@ SELECT pg_reload_conf();
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
|
||||||
BEGIN;
|
|
||||||
SELECT
|
|
||||||
master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
|
|
||||||
FROM
|
|
||||||
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
|
||||||
WHERE
|
|
||||||
logicalrelid = 'mx_table_1'::regclass
|
|
||||||
AND nodeport = :worker_1_port
|
|
||||||
ORDER BY
|
|
||||||
shardid
|
|
||||||
LIMIT 1;
|
|
||||||
ERROR: source placement must be in active state
|
|
||||||
ROLLBACK;
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
-- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION
|
-- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION
|
||||||
-- with citus_use_authinfo takes into account node_conninfo even when
|
-- with citus_use_authinfo takes into account node_conninfo even when
|
||||||
|
|
|
@ -48,7 +48,7 @@ SELECT citus_copy_shard_placement(
|
||||||
'localhost', :worker_2_port,
|
'localhost', :worker_2_port,
|
||||||
transfer_mode := 'block_writes');
|
transfer_mode := 'block_writes');
|
||||||
|
|
||||||
-- verify we error out if target already contains a healthy placement
|
-- verify we warn if target already contains a healthy placement
|
||||||
SELECT citus_copy_shard_placement(
|
SELECT citus_copy_shard_placement(
|
||||||
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
||||||
'localhost', :worker_1_port,
|
'localhost', :worker_1_port,
|
||||||
|
|
|
@ -78,7 +78,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table2_
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
|
||||||
-- copy colocated shards again to see error message
|
-- copy colocated shards again to see warning
|
||||||
SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT citus_copy_shard_placement(13000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||||
|
|
||||||
|
|
||||||
|
@ -159,6 +159,9 @@ WHERE
|
||||||
AND sp.shardstate != 4
|
AND sp.shardstate != 4
|
||||||
ORDER BY s.shardid, sp.nodeport;
|
ORDER BY s.shardid, sp.nodeport;
|
||||||
|
|
||||||
|
-- moving the shard again is idempotent
|
||||||
|
SELECT citus_move_shard_placement(13000001, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||||
|
|
||||||
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
-- also connect worker to verify we successfully moved given shard (and other colocated shards)
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table1_group1_13000001'::regclass;
|
||||||
|
@ -222,7 +225,7 @@ ORDER BY s.shardid, sp.nodeport;
|
||||||
|
|
||||||
|
|
||||||
-- try to move shard from wrong node
|
-- try to move shard from wrong node
|
||||||
SELECT master_move_shard_placement(13000021, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT master_move_shard_placement(13000021, 'localhost', :master_port, 'localhost', :worker_1_port, 'force_logical');
|
||||||
|
|
||||||
|
|
||||||
-- test shard move with foreign constraints
|
-- test shard move with foreign constraints
|
||||||
|
|
|
@ -86,23 +86,7 @@ LIMIT 1;
|
||||||
ALTER SYSTEM SET citus.node_conninfo TO 'sslrootcert=/non/existing/certificate.crt sslmode=verify-full';
|
ALTER SYSTEM SET citus.node_conninfo TO 'sslrootcert=/non/existing/certificate.crt sslmode=verify-full';
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
\c - - - :master_port
|
|
||||||
|
|
||||||
BEGIN;
|
|
||||||
SELECT
|
|
||||||
master_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
|
|
||||||
FROM
|
|
||||||
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
|
||||||
WHERE
|
|
||||||
logicalrelid = 'mx_table_1'::regclass
|
|
||||||
AND nodeport = :worker_1_port
|
|
||||||
ORDER BY
|
|
||||||
shardid
|
|
||||||
LIMIT 1;
|
|
||||||
ROLLBACK;
|
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
-- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION
|
-- before reseting citus.node_conninfo, check that CREATE SUBSCRIPTION
|
||||||
-- with citus_use_authinfo takes into account node_conninfo even when
|
-- with citus_use_authinfo takes into account node_conninfo even when
|
||||||
-- one of host, port, or user parameters are not specified.
|
-- one of host, port, or user parameters are not specified.
|
||||||
|
|
Loading…
Reference in New Issue