Fix reporting of progress on waiting and moved shards (#6274)

In commit 31faa88a4e I removed some features of the rebalance progress
monitor. I did this because the plan was to remove the foreground shard
rebalancer later in the PR that would add the background shard
rebalancer. So, I didn't want to spend time fixing something that we
would throw away anyway.

As it turns out we're not removing the foreground shard rebalancer after
all, so it made sens to fix the stuff that I broke. This PR does that.
For the most part this commit reverts the changes in commit 31faa88a4e.
It's not a full revert though, because it keeps the improved tests and
the changes to `citus_move_shard_placement`.
pull/6276/head
Jelte Fennema 2022-08-31 13:55:47 +02:00 committed by GitHub
parent 98dcbeb304
commit 8bb082e77d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 29 deletions

View File

@ -21,6 +21,7 @@
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_enum.h" #include "catalog/pg_enum.h"
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
@ -397,15 +398,28 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
targetNodeName, targetNodePort); targetNodeName, targetNodePort);
/*
* We want to be able to track progress of shard moves using
* get_rebalancer_progress. If this move is initiated by the rebalancer,
* then the rebalancer call has already set up the shared memory that is
* used to do that. But if citus_move_shard_placement is called directly by
* the user (or through any other mechanism), then the shared memory is not
* set up yet. In that case we do it here.
*/
if (!IsRebalancerInternalBackend())
{
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
PlacementUpdateEvent *placementUpdateEvent = palloc0(sizeof(PlacementUpdateEvent)); PlacementUpdateEvent *placementUpdateEvent = palloc0(
sizeof(PlacementUpdateEvent));
placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE;
placementUpdateEvent->shardId = shardId; placementUpdateEvent->shardId = shardId;
placementUpdateEvent->sourceNode = sourceNode; placementUpdateEvent->sourceNode = sourceNode;
placementUpdateEvent->targetNode = targetNode; placementUpdateEvent->targetNode = targetNode;
SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId); SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId,
REBALANCE_PROGRESS_MOVING);
}
/* /*
* At this point of the shard moves, we don't need to block the writes to * At this point of the shard moves, we don't need to block the writes to

View File

@ -201,6 +201,8 @@ static int PlacementsHashCompare(const void *lhsKey, const void *rhsKey, Size ke
static uint32 PlacementsHashHashCode(const void *key, Size keySize); static uint32 PlacementsHashHashCode(const void *key, Size keySize);
static bool WorkerNodeListContains(List *workerNodeList, const char *workerName, static bool WorkerNodeListContains(List *workerNodeList, const char *workerName,
uint32 workerPort); uint32 workerPort);
static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName,
int sourcePort, uint64 progress);
static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
static NodeFillState * FindFillStateForPlacement(RebalanceState *state, static NodeFillState * FindFillStateForPlacement(RebalanceState *state,
ShardPlacement *placement); ShardPlacement *placement);
@ -258,6 +260,7 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
bool RunningUnderIsolationTest = false; bool RunningUnderIsolationTest = false;
int MaxRebalancerLoggedIgnoredMoves = 5; int MaxRebalancerLoggedIgnoredMoves = 5;
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
/* /*
@ -763,7 +766,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
* dsm handle so that it can be used for updating the progress and cleaning things up. * dsm handle so that it can be used for updating the progress and cleaning things up.
*/ */
void void
SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) SetupRebalanceMonitor(List *placementUpdateList,
Oid relationId,
uint64 initialProgressState)
{ {
List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList);
ListCell *colocatedUpdateCell = NULL; ListCell *colocatedUpdateCell = NULL;
@ -787,7 +792,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
event->shardId = colocatedUpdate->shardId; event->shardId = colocatedUpdate->shardId;
event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->sourcePort = colocatedUpdate->sourceNode->workerPort;
event->targetPort = colocatedUpdate->targetNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort;
pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_MOVING); pg_atomic_init_u64(&event->progress, initialProgressState);
eventIndex++; eventIndex++;
} }
@ -1186,34 +1191,63 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex; PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex;
uint64 shardId = step->shardId; uint64 shardId = step->shardId;
uint64 shardSize = 0;
uint64 backupShardSize = 0;
uint64 progress = pg_atomic_read_u64(&step->progress);
uint64 shardSize = WorkerShardSize(shardStatistics, step->sourceName, uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName,
step->sourcePort, shardId); step->sourcePort, shardId);
uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName,
step->targetPort, shardId);
if (progress == REBALANCE_PROGRESS_WAITING ||
progress == REBALANCE_PROGRESS_MOVING)
{
/*
* If we are not done with the move, the correct shard size is the
* size on the source.
*/
shardSize = sourceSize;
backupShardSize = targetSize;
}
else if (progress == REBALANCE_PROGRESS_MOVED)
{
/*
* If we are done with the move, the correct shard size is the size
* on the target
*/
shardSize = targetSize;
backupShardSize = sourceSize;
}
if (shardSize == 0) if (shardSize == 0)
{ {
/* if (backupShardSize == 0)
* It's possible that we are reading the sizes after the move has
* already fininshed. This means that the shards on the source
* might have already been deleted. In that case we instead report
* the size on the target as the shard size, since that is now the
* only existing shard.
*/
shardSize = WorkerShardSize(shardStatistics, step->targetName,
step->targetPort, shardId);
if (shardSize == 0)
{ {
/* /*
* We don't have any useful shard size. This can happen when a * We don't have any useful shard size. This can happen when a
* shard is moved multiple times and it is not present on * shard is moved multiple times and it is not present on
* either of these nodes. Probably the shard is on a worker * either of these nodes. Probably the shard is on a worker
* related to the next move. In the weird case that this shard * related to another event. In the weird case that this shard
* is on the nodes and actually is size 0, we will have no * is on the nodes and actually is size 0, we will have no
* entry in the hashmap. When fetching from it we always * entry in the hashmap. When fetching from it we always
* default to 0 if no entry is found, so that's fine. * default to 0 if no entry is found, so that's fine.
*/ */
continue; continue;
} }
/*
* Because of the way we fetch shard sizes they are from a slightly
* earlier moment than the progress state we just read from shared
* memory. Usually this is no problem, but there exist some race
* conditions where this matters. For example, for very quick moves
* it is possible that even though a step is now reported as MOVED,
* when we read the shard sizes the move had not even started yet.
* This in turn can mean that the target size is 0 while the source
* size is not. We try to handle such rare edge cases by falling
* back on the other shard size if that one is not 0.
*/
shardSize = backupShardSize;
} }
@ -1427,6 +1461,15 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount,
AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort, AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort,
step->shardId); step->shardId);
if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING)
{
/*
* shard move has not started so we don't need target stats for
* this shard
*/
continue;
}
AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort, AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort,
step->shardId); step->shardId);
} }
@ -1559,6 +1602,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
* This uses the first relationId from the list, it's only used for display * This uses the first relationId from the list, it's only used for display
* purposes so it does not really matter which to show * purposes so it does not really matter which to show
*/ */
SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList),
REBALANCE_PROGRESS_WAITING);
ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving");
FinalizeCurrentProgressMonitor(); FinalizeCurrentProgressMonitor();
} }
@ -1635,11 +1680,21 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
errmsg("only moving or copying shards is supported"))); errmsg("only moving or copying shards is supported")));
} }
UpdateColocatedShardPlacementProgress(shardId,
sourceNode->workerName,
sourceNode->workerPort,
REBALANCE_PROGRESS_MOVING);
/* /*
* In case of failure, we throw an error such that rebalance_table_shards * In case of failure, we throw an error such that rebalance_table_shards
* fails early. * fails early.
*/ */
ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data); ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data);
UpdateColocatedShardPlacementProgress(shardId,
sourceNode->workerName,
sourceNode->workerPort,
REBALANCE_PROGRESS_MOVED);
} }
@ -2700,6 +2755,51 @@ WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 work
} }
/*
* UpdateColocatedShardPlacementProgress updates the progress of the given placement,
* along with its colocated placements, to the given state.
*/
static void
UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort,
uint64 progress)
{
ProgressMonitorData *header = GetCurrentProgressMonitor();
if (header != NULL)
{
PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header);
ListCell *colocatedShardIntervalCell = NULL;
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval);
for (int moveIndex = 0; moveIndex < header->stepCount; moveIndex++)
{
PlacementUpdateEventProgress *step = steps + moveIndex;
uint64 currentShardId = step->shardId;
bool colocatedShard = false;
foreach(colocatedShardIntervalCell, colocatedShardIntervalList)
{
ShardInterval *candidateShard = lfirst(colocatedShardIntervalCell);
if (candidateShard->shardId == currentShardId)
{
colocatedShard = true;
break;
}
}
if (colocatedShard &&
strcmp(step->sourceName, sourceName) == 0 &&
step->sourcePort == sourcePort)
{
pg_atomic_write_u64(&step->progress, progress);
}
}
}
}
/* /*
* pg_dist_rebalance_strategy_enterprise_check is a now removed function, but * pg_dist_rebalance_strategy_enterprise_check is a now removed function, but
* to avoid issues during upgrades a C stub is kept. * to avoid issues during upgrades a C stub is kept.

View File

@ -109,11 +109,17 @@ GetCurrentProgressMonitor(void)
/* /*
* FinalizeCurrentProgressMonitor releases the dynamic memory segment of the current * FinalizeCurrentProgressMonitor releases the dynamic memory segment of the current
* progress monitoring data structure and removes the process from * progress monitoring data structure and removes the process from
* pg_stat_get_progress_info() output. * pg_stat_get_progress_info() output. If there's no such dynamic memory
* segment this is a no-op.
*/ */
void void
FinalizeCurrentProgressMonitor(void) FinalizeCurrentProgressMonitor(void)
{ {
if (currentProgressDSMHandle == DSM_HANDLE_INVALID)
{
return;
}
dsm_segment *dsmSegment = dsm_find_mapping(currentProgressDSMHandle); dsm_segment *dsmSegment = dsm_find_mapping(currentProgressDSMHandle);
if (dsmSegment != NULL) if (dsmSegment != NULL)

View File

@ -73,7 +73,9 @@
/* *INDENT-ON* */ /* *INDENT-ON* */
#define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337 #define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337
#define REBALANCE_PROGRESS_WAITING 0
#define REBALANCE_PROGRESS_MOVING 1 #define REBALANCE_PROGRESS_MOVING 1
#define REBALANCE_PROGRESS_MOVED 2
/* Enumeration that defines different placement update types */ /* Enumeration that defines different placement update types */
typedef enum typedef enum
@ -193,7 +195,9 @@ extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlace
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode, extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
const char *operationName); const char *operationName);
extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId);
extern void SetupRebalanceMonitor(List *placementUpdateList,
Oid relationId,
uint64 initialProgressState);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */

View File

@ -35,7 +35,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
--------------------------------------------------------------------- ---------------------------------------------------------------------
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1 colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1
(2 rows) colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0
(4 rows)
step s2-unlock-1-start: step s2-unlock-1-start:
ROLLBACK; ROLLBACK;
@ -112,9 +114,11 @@ step s7-get-progress:
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
--------------------------------------------------------------------- ---------------------------------------------------------------------
colocated1|1500001| 73728|localhost | 57637| 0|localhost | 57638| 73728| 2
colocated2|1500005| 401408|localhost | 57637| 0|localhost | 57638| 401408| 2
colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1 colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1 colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1
(2 rows) (4 rows)
step s3-unlock-2-start: step s3-unlock-2-start:
ROLLBACK; ROLLBACK;
@ -205,7 +209,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
--------------------------------------------------------------------- ---------------------------------------------------------------------
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1 colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
(2 rows) colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0
(4 rows)
step s7-release-lock: step s7-release-lock:
COMMIT; COMMIT;
@ -288,7 +294,9 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
--------------------------------------------------------------------- ---------------------------------------------------------------------
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1 colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1 colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1
(2 rows) colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0
(4 rows)
step s6-release-advisory-lock: step s6-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152); SELECT pg_advisory_unlock(44000, 55152);