From 96912d9ba15789c17b95d3e5b37b82c8a5355f4e Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 17 Oct 2022 16:55:31 +0300 Subject: [PATCH] Add status column to get_rebalance_progress() (#6403) DESCRIPTION: Adds status column to get_rebalance_progress() Introduces a new column named `status` for the function `get_rebalance_progress()`. For each ongoing shard move, this column will reveal information about that shard move operation's current status. For now, candidate status messages could be one of the below. * Not Started * Setting Up * Copying Data * Catching Up * Creating Constraints * Final Catchup * Creating Foreign Keys * Completing * Completed --- .../distributed/operations/shard_rebalancer.c | 27 ++- .../distributed/operations/shard_transfer.c | 110 +++++++++++- .../distributed/progress/multi_progress.c | 11 ++ .../replication/multi_logical_replication.c | 37 ++++ .../udfs/get_rebalance_progress/11.2-1.sql | 3 +- .../udfs/get_rebalance_progress/latest.sql | 3 +- src/include/distributed/multi_progress.h | 1 + src/include/distributed/shard_rebalancer.h | 17 +- src/include/distributed/shard_transfer.h | 7 +- .../isolation_shard_rebalancer_progress.out | 164 ++++++++++-------- src/test/regress/expected/multi_extension.out | 2 +- .../regress/expected/shard_rebalancer.out | 4 +- .../isolation_shard_rebalancer_progress.spec | 3 +- 13 files changed, 303 insertions(+), 86 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 45f4a07f2..47cc94959 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -283,6 +283,18 @@ static const char *PlacementUpdateTypeNames[] = { [PLACEMENT_UPDATE_COPY] = "copy", }; +static const char *PlacementUpdateStatusNames[] = { + [PLACEMENT_UPDATE_STATUS_NOT_STARTED_YET] = "Not Started Yet", + [PLACEMENT_UPDATE_STATUS_SETTING_UP] = "Setting Up", + [PLACEMENT_UPDATE_STATUS_COPYING_DATA] = "Copying Data", + [PLACEMENT_UPDATE_STATUS_CATCHING_UP] = "Catching Up", + [PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS] = "Creating Constraints", + [PLACEMENT_UPDATE_STATUS_FINAL_CATCH_UP] = "Final Catchup", + [PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS] = "Creating Foreign Keys", + [PLACEMENT_UPDATE_STATUS_COMPLETING] = "Completing", + [PLACEMENT_UPDATE_STATUS_COMPLETED] = "Completed", +}; + #ifdef USE_ASSERT_CHECKING /* @@ -797,7 +809,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId, - uint64 initialProgressState) + uint64 initialProgressState, + PlacementUpdateStatus initialStatus) { List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList); ListCell *colocatedUpdateCell = NULL; @@ -822,6 +835,7 @@ SetupRebalanceMonitor(List *placementUpdateList, event->sourcePort = colocatedUpdate->sourceNode->workerPort; event->targetPort = colocatedUpdate->targetNode->workerPort; event->updateType = colocatedUpdate->updateType; + pg_atomic_init_u64(&event->updateStatus, initialStatus); pg_atomic_init_u64(&event->progress, initialProgressState); eventIndex++; @@ -1261,8 +1275,8 @@ get_rebalance_progress(PG_FUNCTION_ARGS) shardSize = shardSizesStat->totalSize; } - Datum values[14]; - bool nulls[14]; + Datum values[15]; + bool nulls[15]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -1282,6 +1296,10 @@ get_rebalance_progress(PG_FUNCTION_ARGS) cstring_to_text(PlacementUpdateTypeNames[step->updateType])); values[12] = LSNGetDatum(sourceLSN); values[13] = LSNGetDatum(targetLSN); + values[14] = PointerGetDatum(cstring_to_text( + PlacementUpdateStatusNames[ + pg_atomic_read_u64( + &step->updateStatus)])); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -1794,7 +1812,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) * purposes so it does not really matter which to show */ SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList), - REBALANCE_PROGRESS_WAITING); + REBALANCE_PROGRESS_WAITING, + PLACEMENT_UPDATE_STATUS_NOT_STARTED_YET); ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); FinalizeCurrentProgressMonitor(); } diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index cdb61731f..6bab424d7 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -338,9 +338,16 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) placementUpdateEvent->sourceNode = sourceNode; placementUpdateEvent->targetNode = targetNode; SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, - REBALANCE_PROGRESS_MOVING); + REBALANCE_PROGRESS_MOVING, + PLACEMENT_UPDATE_STATUS_SETTING_UP); } + UpdatePlacementUpdateStatusForShardIntervalList( + colocatedShardList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_SETTING_UP); + /* * At this point of the shard moves, we don't need to block the writes to * shards when logical replication is used. @@ -408,6 +415,12 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) sourceNodePort, targetNodeName, targetNodePort); + UpdatePlacementUpdateStatusForShardIntervalList( + colocatedShardList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETED); + FinalizeCurrentProgressMonitor(); PG_RETURN_VOID(); } @@ -1007,8 +1020,14 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, placementUpdateEvent->sourceNode = sourceNode; placementUpdateEvent->targetNode = targetNode; SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, - REBALANCE_PROGRESS_MOVING); + REBALANCE_PROGRESS_MOVING, + PLACEMENT_UPDATE_STATUS_SETTING_UP); + UpdatePlacementUpdateStatusForShardIntervalList( + colocatedShardList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_SETTING_UP); /* * At this point of the shard replication, we don't need to block the writes to @@ -1078,6 +1097,13 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, SendCommandToWorkersWithMetadata(placementCommand); } } + + UpdatePlacementUpdateStatusForShardIntervalList( + colocatedShardList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETED); + FinalizeCurrentProgressMonitor(); } @@ -1257,8 +1283,20 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, tableOwner, ddlCommandList); } + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + foreach_ptr(shardInterval, shardIntervalList) { List *ddlCommandList = @@ -1291,6 +1329,12 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } } + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + /* * Iterate through the colocated shards and create DDL commamnds * to create the foreign constraints. @@ -1322,6 +1366,12 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, shardCommandList->ddlCommandList); } + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); + MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } @@ -1937,3 +1987,59 @@ WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId) return applyDDLCommandList; } + + +/* + * UpdatePlacementUpdateStatusForShardIntervalList updates the status field for shards + * in the given shardInterval list. + */ +void +UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalList, + char *sourceName, int sourcePort, + PlacementUpdateStatus status) +{ + List *segmentList = NIL; + List *rebalanceMonitorList = NULL; + + if (!HasProgressMonitor()) + { + rebalanceMonitorList = ProgressMonitorList(REBALANCE_ACTIVITY_MAGIC_NUMBER, + &segmentList); + } + else + { + rebalanceMonitorList = list_make1(GetCurrentProgressMonitor()); + } + + ProgressMonitorData *monitor = NULL; + foreach_ptr(monitor, rebalanceMonitorList) + { + PlacementUpdateEventProgress *steps = ProgressMonitorSteps(monitor); + + for (int moveIndex = 0; moveIndex < monitor->stepCount; moveIndex++) + { + PlacementUpdateEventProgress *step = steps + moveIndex; + uint64 currentShardId = step->shardId; + bool foundInList = false; + + ShardInterval *candidateShard = NULL; + foreach_ptr(candidateShard, shardIntervalList) + { + if (candidateShard->shardId == currentShardId) + { + foundInList = true; + break; + } + } + + if (foundInList && + strcmp(step->sourceName, sourceName) == 0 && + step->sourcePort == sourcePort) + { + pg_atomic_write_u64(&step->updateStatus, status); + } + } + } + + DetachFromDSMSegments(segmentList); +} diff --git a/src/backend/distributed/progress/multi_progress.c b/src/backend/distributed/progress/multi_progress.c index 657f3356c..8a3adf4bc 100644 --- a/src/backend/distributed/progress/multi_progress.c +++ b/src/backend/distributed/progress/multi_progress.c @@ -133,6 +133,17 @@ FinalizeCurrentProgressMonitor(void) } +/* + * HasProgressMonitor returns true if there is a current progress monitor, + * by checking the variable currentProgressDSMHandle. Returns false otherwise. + */ +bool +HasProgressMonitor(void) +{ + return currentProgressDSMHandle != DSM_HANDLE_INVALID; +} + + /* * ProgressMonitorList returns the addresses of monitors of ongoing commands, associated * with the given identifier magic number. The function takes a pass in diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 850daca24..43a48c1c3 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -258,6 +258,12 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ CreateReplicaIdentities(logicalRepTargetList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + CopyShardsToNode(sourceNode, targetNode, shardList, snapshot); /* @@ -374,6 +380,12 @@ CompleteNonBlockingShardTransfer(List *shardList, /* Start applying the changes from the replication slots to catch up. */ EnableSubscriptions(logicalRepTargetList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceConnection->hostname, + sourceConnection->port, + PLACEMENT_UPDATE_STATUS_CATCHING_UP); + /* * The following check is a leftover from when used subscriptions with * copy_data=true. It's probably not really necessary anymore, but it @@ -392,12 +404,25 @@ CompleteNonBlockingShardTransfer(List *shardList, */ WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceConnection->hostname, + sourceConnection->port, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + /* * Now lets create the post-load objects, such as the indexes, constraints * and partitioning hierarchy. Once they are done, wait until the replication * catches up again. So we don't block writes too long. */ CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceConnection->hostname, + sourceConnection->port, + PLACEMENT_UPDATE_STATUS_FINAL_CATCH_UP); + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); @@ -419,6 +444,12 @@ CompleteNonBlockingShardTransfer(List *shardList, if (type != SHARD_SPLIT) { + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceConnection->hostname, + sourceConnection->port, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + /* * We're creating the foreign constraints to reference tables after the * data is already replicated and all the necessary locks are acquired. @@ -432,6 +463,12 @@ CompleteNonBlockingShardTransfer(List *shardList, CreateUncheckedForeignKeyConstraints(logicalRepTargetList); } + UpdatePlacementUpdateStatusForShardIntervalList( + shardList, + sourceConnection->hostname, + sourceConnection->port, + PLACEMENT_UPDATE_STATUS_COMPLETING); + /* we're done, cleanup the publication and subscription */ DropSubscriptions(logicalRepTargetList); DropReplicationSlots(sourceConnection, logicalRepTargetList); diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql index 66d4d4bec..4993141b1 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/11.2-1.sql @@ -14,7 +14,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() target_shard_size bigint, operation_type text, source_lsn pg_lsn, - target_lsn pg_lsn + target_lsn pg_lsn, + status text ) AS 'MODULE_PATHNAME' LANGUAGE C STRICT; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql index 66d4d4bec..4993141b1 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_progress/latest.sql @@ -14,7 +14,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_progress() target_shard_size bigint, operation_type text, source_lsn pg_lsn, - target_lsn pg_lsn + target_lsn pg_lsn, + status text ) AS 'MODULE_PATHNAME' LANGUAGE C STRICT; diff --git a/src/include/distributed/multi_progress.h b/src/include/distributed/multi_progress.h index 591116583..64bad527f 100644 --- a/src/include/distributed/multi_progress.h +++ b/src/include/distributed/multi_progress.h @@ -34,6 +34,7 @@ extern void RegisterProgressMonitor(uint64 progressTypeMagicNumber, dsm_handle dsmHandle); extern ProgressMonitorData * GetCurrentProgressMonitor(void); extern void FinalizeCurrentProgressMonitor(void); +extern bool HasProgressMonitor(void); extern List * ProgressMonitorList(uint64 commandTypeMagicNumber, List **attachedDSMSegmentList); extern void DetachFromDSMSegments(List *dsmSegmentList); diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 3abeb80fb..ac4864422 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -85,6 +85,19 @@ typedef enum PLACEMENT_UPDATE_COPY = 2 } PlacementUpdateType; +typedef enum +{ + PLACEMENT_UPDATE_STATUS_NOT_STARTED_YET = 0, + PLACEMENT_UPDATE_STATUS_SETTING_UP = 1, + PLACEMENT_UPDATE_STATUS_COPYING_DATA = 2, + PLACEMENT_UPDATE_STATUS_CATCHING_UP = 3, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS = 4, + PLACEMENT_UPDATE_STATUS_FINAL_CATCH_UP = 5, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS = 6, + PLACEMENT_UPDATE_STATUS_COMPLETING = 7, + PLACEMENT_UPDATE_STATUS_COMPLETED = 8, +} PlacementUpdateStatus; + /* * PlacementUpdateEvent represents a logical unit of work that copies or @@ -108,6 +121,7 @@ typedef struct PlacementUpdateEventProgress int targetPort; PlacementUpdateType updateType; pg_atomic_uint64 progress; + pg_atomic_uint64 updateStatus; } PlacementUpdateEventProgress; typedef struct NodeFillState @@ -199,6 +213,7 @@ extern void AcquirePlacementColocationLock(Oid relationId, int lockMode, extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId, - uint64 initialProgressState); + uint64 initialProgressState, + PlacementUpdateStatus initialStatus); #endif /* SHARD_REBALANCER_H */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index eb845adc2..aa2bd7002 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * repair_shards.h + * shard_transfer.h * Code used to move shards around. * * Copyright (c) Citus Data, Inc. @@ -9,6 +9,7 @@ #include "postgres.h" +#include "distributed/shard_rebalancer.h" #include "nodes/pg_list.h" extern uint64 ShardListSizeInBytes(List *colocatedShardList, @@ -18,3 +19,7 @@ extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, char *snapshotName); extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); extern bool RelationCanPublishAllModifications(Oid relationId); +extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalList, + char *sourceName, + int sourcePort, + PlacementUpdateStatus status); diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 69d0a9f91..2b29868c9 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -33,15 +33,16 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) step s2-unlock-1-start: @@ -81,10 +82,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -124,15 +126,16 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 0|localhost | 57638| 50000| 2|move |t |t |f |Completed +colocated2|1500005| 400000|localhost | 57637| 0|localhost | 57638| 400000| 2|move |t |t |f |Completed +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (4 rows) step s3-unlock-2-start: @@ -172,10 +175,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -227,15 +231,16 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) step s7-release-lock: @@ -275,10 +280,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -322,15 +328,16 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f -colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f -colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet +colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) step s6-release-advisory-lock: @@ -375,10 +382,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -418,13 +426,14 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (2 rows) step s2-unlock-1-start: @@ -456,10 +465,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -510,13 +520,14 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing (2 rows) step s7-release-lock: @@ -548,10 +559,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -592,13 +604,14 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|copy |t |t |f |Setting Up (2 rows) step s2-unlock-1-start: @@ -649,13 +662,14 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up (2 rows) step s6-release-advisory-lock: @@ -692,10 +706,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -739,13 +754,14 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up (2 rows) step s6-release-advisory-lock: @@ -802,14 +818,15 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 0| 1|move |t |t |f |Setting Up +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (3 rows) step s2-unlock-1-start: @@ -850,10 +867,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) @@ -908,14 +926,15 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f +colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Completing +colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Completing +separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f |Completing (3 rows) step s7-release-lock: @@ -956,10 +975,11 @@ step s7-get-progress: operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); -table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available +table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 40d8ccfd9..ee842efc8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1202,7 +1202,7 @@ SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | - | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn) + | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text) (2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 2a927ea18..9cdc0d4a3 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1044,7 +1044,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala -- Check that we can call this function SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn | status --------------------------------------------------------------------- (0 rows) @@ -1058,7 +1058,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); - sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn + sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size | operation_type | source_lsn | target_lsn | status --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index b9d10d047..c1a3eac3a 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -175,7 +175,8 @@ step "s7-get-progress" operation_type, source_lsn >= target_lsn as lsn_sanity_check, source_lsn > '0/0' as source_lsn_available, - target_lsn > '0/0' as target_lsn_available + target_lsn > '0/0' as target_lsn_available, + status FROM get_rebalance_progress(); }