mirror of https://github.com/citusdata/citus.git
Track rebalance progress at the shard move level (#6187)
We're in the processes of totally changing the shard rebalancer experience and infrastructure. Soon the shard rebalancer will include retries, crash recovery and support for running in the background. These improvements come at a cost though, the way the get_rebalance_progress UDF currently works is very hard to replicate with this new structure. This is mostly because the old behaviour doesn't really make sense anymore with this new infrastructure. A new and better way to track the progress will be included as part of the new infrastructure. This PR is in preparation of the new code rebalancer experience. It changes the get_rebalance_progress UDF to only display the moves that are in progress at the moment, not the ones that happened in the past or that are planned in the future. Another option would have been to completely remove the current get_rebalance_progress functionality and point people to the new way of tracking progress. But old blogposts still reference the old UDF and users might have some automation on top of it. Showing the progress of the current moves is fairly simple to achieve, even with the new infrastructure. So this PR is a kind of compromise: It doesn't have complete feature parity with the old get_rebalance_progress, but the most common use cases will still work. There's also an advantage of the change: You can now see progress of shard moves that were triggered by calling citus_move_shard_placement manually. Instead of only being able to see progress of moves that were initiated using get_rebalance_table_shards.pull/6194/head
parent
961fcff5db
commit
31faa88a4e
|
@ -36,6 +36,7 @@
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/multi_logical_replication.h"
|
#include "distributed/multi_logical_replication.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
|
#include "distributed/multi_progress.h"
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
@ -396,6 +397,17 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
|
EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
|
||||||
targetNodeName, targetNodePort);
|
targetNodeName, targetNodePort);
|
||||||
|
|
||||||
|
|
||||||
|
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||||
|
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||||
|
|
||||||
|
PlacementUpdateEvent *placementUpdateEvent = palloc0(sizeof(PlacementUpdateEvent));
|
||||||
|
placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE;
|
||||||
|
placementUpdateEvent->shardId = shardId;
|
||||||
|
placementUpdateEvent->sourceNode = sourceNode;
|
||||||
|
placementUpdateEvent->targetNode = targetNode;
|
||||||
|
SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point of the shard moves, we don't need to block the writes to
|
* At this point of the shard moves, we don't need to block the writes to
|
||||||
* shards when logical replication is used.
|
* shards when logical replication is used.
|
||||||
|
@ -463,6 +475,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
sourceNodePort, targetNodeName,
|
sourceNodePort, targetNodeName,
|
||||||
targetNodePort);
|
targetNodePort);
|
||||||
|
|
||||||
|
FinalizeCurrentProgressMonitor();
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -201,8 +201,6 @@ static int PlacementsHashCompare(const void *lhsKey, const void *rhsKey, Size ke
|
||||||
static uint32 PlacementsHashHashCode(const void *key, Size keySize);
|
static uint32 PlacementsHashHashCode(const void *key, Size keySize);
|
||||||
static bool WorkerNodeListContains(List *workerNodeList, const char *workerName,
|
static bool WorkerNodeListContains(List *workerNodeList, const char *workerName,
|
||||||
uint32 workerPort);
|
uint32 workerPort);
|
||||||
static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName,
|
|
||||||
int sourcePort, uint64 progress);
|
|
||||||
static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
|
static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
|
||||||
static NodeFillState * FindFillStateForPlacement(RebalanceState *state,
|
static NodeFillState * FindFillStateForPlacement(RebalanceState *state,
|
||||||
ShardPlacement *placement);
|
ShardPlacement *placement);
|
||||||
|
@ -235,7 +233,6 @@ static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name);
|
||||||
static void EnsureShardCostUDF(Oid functionOid);
|
static void EnsureShardCostUDF(Oid functionOid);
|
||||||
static void EnsureNodeCapacityUDF(Oid functionOid);
|
static void EnsureNodeCapacityUDF(Oid functionOid);
|
||||||
static void EnsureShardAllowedOnNodeUDF(Oid functionOid);
|
static void EnsureShardAllowedOnNodeUDF(Oid functionOid);
|
||||||
static void ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId);
|
|
||||||
static HTAB * BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps,
|
static HTAB * BuildWorkerShardStatisticsHash(PlacementUpdateEventProgress *steps,
|
||||||
int stepCount);
|
int stepCount);
|
||||||
static HTAB * GetShardStatistics(MultiConnection *connection, HTAB *shardIds);
|
static HTAB * GetShardStatistics(MultiConnection *connection, HTAB *shardIds);
|
||||||
|
@ -261,15 +258,6 @@ PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
|
||||||
bool RunningUnderIsolationTest = false;
|
bool RunningUnderIsolationTest = false;
|
||||||
int MaxRebalancerLoggedIgnoredMoves = 5;
|
int MaxRebalancerLoggedIgnoredMoves = 5;
|
||||||
|
|
||||||
/*
|
|
||||||
* This is randomly generated hardcoded number. It's used as the first part of
|
|
||||||
* the advisory lock identifier that's used during isolation tests. See the
|
|
||||||
* comments on ConflictShardPlacementUpdateOnlyWithIsolationTesting, for more
|
|
||||||
* information.
|
|
||||||
*/
|
|
||||||
#define SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY 29279
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef USE_ASSERT_CHECKING
|
#ifdef USE_ASSERT_CHECKING
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -774,7 +762,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
|
||||||
* a magic number to the first progress field as an indicator. Finally we return the
|
* a magic number to the first progress field as an indicator. Finally we return the
|
||||||
* dsm handle so that it can be used for updating the progress and cleaning things up.
|
* dsm handle so that it can be used for updating the progress and cleaning things up.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
||||||
{
|
{
|
||||||
List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList);
|
List *colocatedUpdateList = GetColocatedRebalanceSteps(placementUpdateList);
|
||||||
|
@ -799,7 +787,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
|
||||||
event->shardId = colocatedUpdate->shardId;
|
event->shardId = colocatedUpdate->shardId;
|
||||||
event->sourcePort = colocatedUpdate->sourceNode->workerPort;
|
event->sourcePort = colocatedUpdate->sourceNode->workerPort;
|
||||||
event->targetPort = colocatedUpdate->targetNode->workerPort;
|
event->targetPort = colocatedUpdate->targetNode->workerPort;
|
||||||
pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_WAITING);
|
pg_atomic_init_u64(&event->progress, REBALANCE_PROGRESS_MOVING);
|
||||||
|
|
||||||
eventIndex++;
|
eventIndex++;
|
||||||
}
|
}
|
||||||
|
@ -1198,63 +1186,34 @@ BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics)
|
||||||
PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex;
|
PlacementUpdateEventProgress *step = placementUpdateEvents + eventIndex;
|
||||||
|
|
||||||
uint64 shardId = step->shardId;
|
uint64 shardId = step->shardId;
|
||||||
uint64 shardSize = 0;
|
|
||||||
uint64 backupShardSize = 0;
|
|
||||||
uint64 progress = pg_atomic_read_u64(&step->progress);
|
|
||||||
|
|
||||||
uint64 sourceSize = WorkerShardSize(shardStatistics, step->sourceName,
|
uint64 shardSize = WorkerShardSize(shardStatistics, step->sourceName,
|
||||||
step->sourcePort, shardId);
|
step->sourcePort, shardId);
|
||||||
uint64 targetSize = WorkerShardSize(shardStatistics, step->targetName,
|
|
||||||
step->targetPort, shardId);
|
|
||||||
|
|
||||||
if (progress == REBALANCE_PROGRESS_WAITING ||
|
|
||||||
progress == REBALANCE_PROGRESS_MOVING)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* If we are not done with the move, the correct shard size is the
|
|
||||||
* size on the source.
|
|
||||||
*/
|
|
||||||
shardSize = sourceSize;
|
|
||||||
backupShardSize = targetSize;
|
|
||||||
}
|
|
||||||
else if (progress == REBALANCE_PROGRESS_MOVED)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* If we are done with the move, the correct shard size is the size
|
|
||||||
* on the target
|
|
||||||
*/
|
|
||||||
shardSize = targetSize;
|
|
||||||
backupShardSize = sourceSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shardSize == 0)
|
if (shardSize == 0)
|
||||||
{
|
{
|
||||||
if (backupShardSize == 0)
|
/*
|
||||||
|
* It's possible that we are reading the sizes after the move has
|
||||||
|
* already fininshed. This means that the shards on the source
|
||||||
|
* might have already been deleted. In that case we instead report
|
||||||
|
* the size on the target as the shard size, since that is now the
|
||||||
|
* only existing shard.
|
||||||
|
*/
|
||||||
|
shardSize = WorkerShardSize(shardStatistics, step->targetName,
|
||||||
|
step->targetPort, shardId);
|
||||||
|
if (shardSize == 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We don't have any useful shard size. This can happen when a
|
* We don't have any useful shard size. This can happen when a
|
||||||
* shard is moved multiple times and it is not present on
|
* shard is moved multiple times and it is not present on
|
||||||
* either of these nodes. Probably the shard is on a worker
|
* either of these nodes. Probably the shard is on a worker
|
||||||
* related to another event. In the weird case that this shard
|
* related to the next move. In the weird case that this shard
|
||||||
* is on the nodes and actually is size 0, we will have no
|
* is on the nodes and actually is size 0, we will have no
|
||||||
* entry in the hashmap. When fetching from it we always
|
* entry in the hashmap. When fetching from it we always
|
||||||
* default to 0 if no entry is found, so that's fine.
|
* default to 0 if no entry is found, so that's fine.
|
||||||
*/
|
*/
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Because of the way we fetch shard sizes they are from a slightly
|
|
||||||
* earlier moment than the progress state we just read from shared
|
|
||||||
* memory. Usually this is no problem, but there exist some race
|
|
||||||
* conditions where this matters. For example, for very quick moves
|
|
||||||
* it is possible that even though a step is now reported as MOVED,
|
|
||||||
* when we read the shard sizes the move had not even started yet.
|
|
||||||
* This in turn can mean that the target size is 0 while the source
|
|
||||||
* size is not. We try to handle such rare edge cases by falling
|
|
||||||
* back on the other shard size if that one is not 0.
|
|
||||||
*/
|
|
||||||
shardSize = backupShardSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1468,15 +1427,6 @@ GetMovedShardIdsByWorker(PlacementUpdateEventProgress *steps, int stepCount,
|
||||||
AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort,
|
AddToWorkerShardIdSet(shardsByWorker, step->sourceName, step->sourcePort,
|
||||||
step->shardId);
|
step->shardId);
|
||||||
|
|
||||||
if (pg_atomic_read_u64(&step->progress) == REBALANCE_PROGRESS_WAITING)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* shard move has not started so we don't need target stats for
|
|
||||||
* this shard
|
|
||||||
*/
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort,
|
AddToWorkerShardIdSet(shardsByWorker, step->targetName, step->targetPort,
|
||||||
step->shardId);
|
step->shardId);
|
||||||
}
|
}
|
||||||
|
@ -1609,48 +1559,11 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
||||||
* This uses the first relationId from the list, it's only used for display
|
* This uses the first relationId from the list, it's only used for display
|
||||||
* purposes so it does not really matter which to show
|
* purposes so it does not really matter which to show
|
||||||
*/
|
*/
|
||||||
SetupRebalanceMonitor(placementUpdateList, linitial_oid(options->relationIdList));
|
|
||||||
ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving");
|
ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving");
|
||||||
FinalizeCurrentProgressMonitor();
|
FinalizeCurrentProgressMonitor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ConflictShardPlacementUpdateOnlyWithIsolationTesting is only useful for
|
|
||||||
* testing and should not be called by any code-path except for
|
|
||||||
* UpdateShardPlacement().
|
|
||||||
*
|
|
||||||
* To be able to test the rebalance monitor functionality correctly, we need to
|
|
||||||
* be able to pause the rebalancer at a specific place in time. We cannot do
|
|
||||||
* this by block the shard move itself someway (e.g. by calling truncate on the
|
|
||||||
* distributed table). The reason for this is that we do the shard move in a
|
|
||||||
* newly opened connection. This causes our isolation tester block detection to
|
|
||||||
* not realise that the rebalance_table_shards call is blocked.
|
|
||||||
*
|
|
||||||
* So instead, before opening a connection we lock an advisory lock that's
|
|
||||||
* based on the shard id (shard id mod 1000). By locking this advisory lock in
|
|
||||||
* a different session we can block the rebalancer in a way that the isolation
|
|
||||||
* tester block detection is able to detect.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ConflictShardPlacementUpdateOnlyWithIsolationTesting(uint64 shardId)
|
|
||||||
{
|
|
||||||
LOCKTAG tag;
|
|
||||||
const bool sessionLock = false;
|
|
||||||
const bool dontWait = false;
|
|
||||||
|
|
||||||
if (RunningUnderIsolationTest)
|
|
||||||
{
|
|
||||||
/* we've picked a random lock */
|
|
||||||
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId,
|
|
||||||
SHARD_PLACEMENT_UPDATE_ADVISORY_LOCK_FIRST_KEY,
|
|
||||||
shardId % 1000, 2);
|
|
||||||
|
|
||||||
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateShardPlacement copies or moves a shard placement by calling
|
* UpdateShardPlacement copies or moves a shard placement by calling
|
||||||
* the corresponding functions in Citus in a subtransaction.
|
* the corresponding functions in Citus in a subtransaction.
|
||||||
|
@ -1722,23 +1635,11 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
|
||||||
errmsg("only moving or copying shards is supported")));
|
errmsg("only moving or copying shards is supported")));
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateColocatedShardPlacementProgress(shardId,
|
|
||||||
sourceNode->workerName,
|
|
||||||
sourceNode->workerPort,
|
|
||||||
REBALANCE_PROGRESS_MOVING);
|
|
||||||
|
|
||||||
ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In case of failure, we throw an error such that rebalance_table_shards
|
* In case of failure, we throw an error such that rebalance_table_shards
|
||||||
* fails early.
|
* fails early.
|
||||||
*/
|
*/
|
||||||
ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data);
|
ExecuteRebalancerCommandInSeparateTransaction(placementUpdateCommand->data);
|
||||||
|
|
||||||
UpdateColocatedShardPlacementProgress(shardId,
|
|
||||||
sourceNode->workerName,
|
|
||||||
sourceNode->workerPort,
|
|
||||||
REBALANCE_PROGRESS_MOVED);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2799,51 +2700,6 @@ WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 work
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UpdateColocatedShardPlacementProgress updates the progress of the given placement,
|
|
||||||
* along with its colocated placements, to the given state.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort,
|
|
||||||
uint64 progress)
|
|
||||||
{
|
|
||||||
ProgressMonitorData *header = GetCurrentProgressMonitor();
|
|
||||||
|
|
||||||
if (header != NULL)
|
|
||||||
{
|
|
||||||
PlacementUpdateEventProgress *steps = ProgressMonitorSteps(header);
|
|
||||||
ListCell *colocatedShardIntervalCell = NULL;
|
|
||||||
|
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
|
||||||
List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval);
|
|
||||||
|
|
||||||
for (int moveIndex = 0; moveIndex < header->stepCount; moveIndex++)
|
|
||||||
{
|
|
||||||
PlacementUpdateEventProgress *step = steps + moveIndex;
|
|
||||||
uint64 currentShardId = step->shardId;
|
|
||||||
bool colocatedShard = false;
|
|
||||||
|
|
||||||
foreach(colocatedShardIntervalCell, colocatedShardIntervalList)
|
|
||||||
{
|
|
||||||
ShardInterval *candidateShard = lfirst(colocatedShardIntervalCell);
|
|
||||||
if (candidateShard->shardId == currentShardId)
|
|
||||||
{
|
|
||||||
colocatedShard = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (colocatedShard &&
|
|
||||||
strcmp(step->sourceName, sourceName) == 0 &&
|
|
||||||
step->sourcePort == sourcePort)
|
|
||||||
{
|
|
||||||
pg_atomic_write_u64(&step->progress, progress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* pg_dist_rebalance_strategy_enterprise_check is a now removed function, but
|
* pg_dist_rebalance_strategy_enterprise_check is a now removed function, but
|
||||||
* to avoid issues during upgrades a C stub is kept.
|
* to avoid issues during upgrades a C stub is kept.
|
||||||
|
|
|
@ -73,10 +73,7 @@
|
||||||
/* *INDENT-ON* */
|
/* *INDENT-ON* */
|
||||||
|
|
||||||
#define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337
|
#define REBALANCE_ACTIVITY_MAGIC_NUMBER 1337
|
||||||
#define REBALANCE_PROGRESS_ERROR -1
|
|
||||||
#define REBALANCE_PROGRESS_WAITING 0
|
|
||||||
#define REBALANCE_PROGRESS_MOVING 1
|
#define REBALANCE_PROGRESS_MOVING 1
|
||||||
#define REBALANCE_PROGRESS_MOVED 2
|
|
||||||
|
|
||||||
/* Enumeration that defines different placement update types */
|
/* Enumeration that defines different placement update types */
|
||||||
typedef enum
|
typedef enum
|
||||||
|
@ -196,5 +193,7 @@ extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlace
|
||||||
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
||||||
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
||||||
const char *operationName);
|
const char *operationName);
|
||||||
|
extern void SetupRebalanceMonitor(List *placementUpdateList, Oid relationId);
|
||||||
|
|
||||||
|
|
||||||
#endif /* SHARD_REBALANCER_H */
|
#endif /* SHARD_REBALANCER_H */
|
||||||
|
|
|
@ -1,34 +1,23 @@
|
||||||
Parsed test spec with 3 sessions
|
Parsed test spec with 7 sessions
|
||||||
|
|
||||||
starting permutation: s2-lock-1 s2-lock-2 s1-rebalance-c1 s3-progress s2-unlock-1 s3-progress s2-unlock-2 s3-progress s1-commit s3-progress
|
starting permutation: s2-lock-1-start s1-rebalance-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop
|
||||||
master_set_node_property
|
master_set_node_property
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s2-lock-1:
|
step s2-lock-1-start:
|
||||||
SELECT pg_advisory_lock(29279, 1);
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 1;
|
||||||
|
DELETE FROM separate WHERE test_id = 1;
|
||||||
|
|
||||||
pg_advisory_lock
|
step s1-rebalance-c1-block-writes:
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s2-lock-2:
|
|
||||||
SELECT pg_advisory_lock(29279, 2);
|
|
||||||
|
|
||||||
pg_advisory_lock
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s1-rebalance-c1:
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s3-progress:
|
step s7-get-progress:
|
||||||
set client_min_messages=NOTICE;
|
set LOCAL client_min_messages=NOTICE;
|
||||||
SELECT
|
SELECT
|
||||||
table_name,
|
table_name,
|
||||||
shardid,
|
shardid,
|
||||||
|
@ -46,50 +35,12 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1
|
||||||
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1
|
||||||
colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 0
|
(2 rows)
|
||||||
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 0
|
|
||||||
(4 rows)
|
|
||||||
|
|
||||||
step s2-unlock-1:
|
step s2-unlock-1-start:
|
||||||
SELECT pg_advisory_unlock(29279, 1);
|
ROLLBACK;
|
||||||
|
|
||||||
pg_advisory_unlock
|
step s1-rebalance-c1-block-writes: <... completed>
|
||||||
---------------------------------------------------------------------
|
|
||||||
t
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s3-progress:
|
|
||||||
set client_min_messages=NOTICE;
|
|
||||||
SELECT
|
|
||||||
table_name,
|
|
||||||
shardid,
|
|
||||||
shard_size,
|
|
||||||
sourcename,
|
|
||||||
sourceport,
|
|
||||||
source_shard_size,
|
|
||||||
targetname,
|
|
||||||
targetport,
|
|
||||||
target_shard_size,
|
|
||||||
progress
|
|
||||||
FROM get_rebalance_progress();
|
|
||||||
|
|
||||||
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
colocated1|1500001| 73728|localhost | 57637| 49152|localhost | 57638| 73728| 2
|
|
||||||
colocated2|1500005| 401408|localhost | 57637| 376832|localhost | 57638| 401408| 2
|
|
||||||
colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1
|
|
||||||
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1
|
|
||||||
(4 rows)
|
|
||||||
|
|
||||||
step s2-unlock-2:
|
|
||||||
SELECT pg_advisory_unlock(29279, 2);
|
|
||||||
|
|
||||||
pg_advisory_unlock
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
t
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
step s1-rebalance-c1: <... completed>
|
|
||||||
table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport
|
table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
colocated1|1500001| 0|localhost | 57637|localhost | 57638
|
colocated1|1500001| 0|localhost | 57637|localhost | 57638
|
||||||
|
@ -103,8 +54,11 @@ rebalance_table_shards
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
step s3-progress:
|
step s1-commit:
|
||||||
set client_min_messages=NOTICE;
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
SELECT
|
SELECT
|
||||||
table_name,
|
table_name,
|
||||||
shardid,
|
shardid,
|
||||||
|
@ -122,11 +76,68 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s3-lock-2-start s1-rebalance-c1-block-writes s7-get-progress s3-unlock-2-start s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s3-lock-2-start:
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 3;
|
||||||
|
|
||||||
|
step s1-rebalance-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500002| 196608|localhost | 57637| 196608|localhost | 57638| 0| 1
|
||||||
|
colocated2|1500006| 8192|localhost | 57637| 8192|localhost | 57638| 0| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s3-unlock-2-start:
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
step s1-rebalance-c1-block-writes: <... completed>
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500005| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated1|1500002| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500006| 0|localhost | 57637|localhost | 57638
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
step s1-commit:
|
step s1-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
step s3-progress:
|
step s7-get-progress:
|
||||||
set client_min_messages=NOTICE;
|
set LOCAL client_min_messages=NOTICE;
|
||||||
SELECT
|
SELECT
|
||||||
table_name,
|
table_name,
|
||||||
shardid,
|
shardid,
|
||||||
|
@ -144,3 +155,762 @@ table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s7-grab-lock s1-rebalance-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s7-grab-lock:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-rebalance-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s7-release-lock:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-rebalance-c1-block-writes: <... completed>
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500005| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated1|1500002| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500006| 0|localhost | 57637|localhost | 57638
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s6-acquire-advisory-lock s1-rebalance-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s6-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-rebalance-c1-online:
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s6-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-rebalance-c1-online: <... completed>
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|targetname|targetport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500005| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated1|1500002| 0|localhost | 57637|localhost | 57638
|
||||||
|
colocated2|1500006| 0|localhost | 57637|localhost | 57638
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s7-grab-lock s1-shard-move-c1-online s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s7-grab-lock:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s7-release-lock:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s7-get-progress s2-unlock-1-start s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-lock-1-start:
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 1;
|
||||||
|
DELETE FROM separate WHERE test_id = 1;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s2-unlock-1-start:
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s7-grab-lock:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s7-release-lock:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s6-acquire-advisory-lock s1-shard-move-c1-online s7-get-progress s6-release-advisory-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s6-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 8192| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 8192| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s6-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s7-grab-lock s1-shard-move-c1-online s7-get-progress s7-release-lock s1-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s7-grab-lock:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
step s7-release-lock:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-online: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s2-lock-1-start s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s2-unlock-1-start s1-commit s4-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s2-lock-1-start:
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 1;
|
||||||
|
DELETE FROM separate WHERE test_id = 1;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s4-shard-move-sep-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 0| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 0| 1
|
||||||
|
separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 0| 1
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s2-unlock-1-start:
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s4-shard-move-sep-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s4-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s7-grab-lock s1-shard-move-c1-block-writes s4-shard-move-sep-block-writes s7-get-progress s7-release-lock s1-commit s4-commit s7-get-progress enable-deferred-drop
|
||||||
|
master_set_node_property
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s7-grab-lock:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s4-shard-move-sep-block-writes:
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
colocated1|1500001| 49152|localhost | 57637| 49152|localhost | 57638| 73728| 1
|
||||||
|
colocated2|1500005| 376832|localhost | 57637| 376832|localhost | 57638| 401408| 1
|
||||||
|
separate |1500009| 122880|localhost | 57637| 122880|localhost | 57638| 147456| 1
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
step s7-release-lock:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-shard-move-c1-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s4-shard-move-sep-block-writes: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s4-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s7-get-progress:
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
shardid,
|
||||||
|
shard_size,
|
||||||
|
sourcename,
|
||||||
|
sourceport,
|
||||||
|
source_shard_size,
|
||||||
|
targetname,
|
||||||
|
targetport,
|
||||||
|
target_shard_size,
|
||||||
|
progress
|
||||||
|
FROM get_rebalance_progress();
|
||||||
|
|
||||||
|
table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
step enable-deferred-drop:
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,14 @@
|
||||||
setup
|
setup
|
||||||
{
|
{
|
||||||
select setval('pg_dist_shardid_seq', GREATEST(1500000, nextval('pg_dist_shardid_seq')));
|
-- We disable deffered drop, so we can easily trigger blocking the shard
|
||||||
|
-- move at the end of the move. This is done in a separate setup step,
|
||||||
|
-- because this cannot run in a transaction.
|
||||||
|
ALTER SYSTEM SET citus.defer_drop_after_shard_move TO OFF;
|
||||||
|
}
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1500001;
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT 1 FROM master_add_node('localhost', 57637);
|
SELECT 1 FROM master_add_node('localhost', 57637);
|
||||||
|
@ -9,29 +17,53 @@ setup
|
||||||
SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none');
|
SELECT create_distributed_table('colocated1', 'test_id', 'hash', 'none');
|
||||||
CREATE TABLE colocated2 (test_id integer NOT NULL, data text);
|
CREATE TABLE colocated2 (test_id integer NOT NULL, data text);
|
||||||
SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1');
|
SELECT create_distributed_table('colocated2', 'test_id', 'hash', 'colocated1');
|
||||||
|
CREATE TABLE separate (test_id integer NOT NULL, data text);
|
||||||
|
SELECT create_distributed_table('separate', 'test_id', 'hash', 'none');
|
||||||
-- 1 and 3 are chosen so they go to shard 1 and 2
|
-- 1 and 3 are chosen so they go to shard 1 and 2
|
||||||
INSERT INTO colocated1(test_id) SELECT 1 from generate_series(0, 1000) i;
|
INSERT INTO colocated1(test_id) SELECT 1 from generate_series(0, 1000) i;
|
||||||
INSERT INTO colocated2(test_id) SELECT 1 from generate_series(0, 10000) i;
|
INSERT INTO colocated2(test_id) SELECT 1 from generate_series(0, 10000) i;
|
||||||
INSERT INTO colocated1(test_id) SELECT 3 from generate_series(0, 5000) i;
|
INSERT INTO colocated1(test_id) SELECT 3 from generate_series(0, 5000) i;
|
||||||
|
INSERT INTO separate(test_id) SELECT 1 from generate_series(0, 3000) i;
|
||||||
select * from pg_dist_placement;
|
select * from pg_dist_placement;
|
||||||
SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', true);
|
SELECT master_set_node_property('localhost', 57638, 'shouldhaveshards', true);
|
||||||
}
|
}
|
||||||
|
|
||||||
teardown
|
teardown
|
||||||
{
|
{
|
||||||
|
SELECT pg_reload_conf();
|
||||||
DROP TABLE colocated2;
|
DROP TABLE colocated2;
|
||||||
DROP TABLE colocated1;
|
DROP TABLE colocated1;
|
||||||
|
DROP TABLE separate;
|
||||||
}
|
}
|
||||||
|
|
||||||
session "s1"
|
session "s1"
|
||||||
|
|
||||||
step "s1-rebalance-c1"
|
step "s1-rebalance-c1-block-writes"
|
||||||
{
|
{
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='block_writes');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-rebalance-c1-online"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM get_rebalance_table_shards_plan('colocated1');
|
||||||
|
SELECT rebalance_table_shards('colocated1', shard_transfer_mode:='force_logical');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-shard-move-c1-block-writes"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-shard-move-c1-online"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500001, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-commit"
|
step "s1-commit"
|
||||||
{
|
{
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -39,31 +71,81 @@ step "s1-commit"
|
||||||
|
|
||||||
session "s2"
|
session "s2"
|
||||||
|
|
||||||
step "s2-lock-1"
|
step "s2-lock-1-start"
|
||||||
{
|
{
|
||||||
SELECT pg_advisory_lock(29279, 1);
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 1;
|
||||||
|
DELETE FROM separate WHERE test_id = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s2-lock-2"
|
step "s2-unlock-1-start"
|
||||||
{
|
{
|
||||||
SELECT pg_advisory_lock(29279, 2);
|
ROLLBACK;
|
||||||
}
|
|
||||||
|
|
||||||
step "s2-unlock-1"
|
|
||||||
{
|
|
||||||
SELECT pg_advisory_unlock(29279, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
step "s2-unlock-2"
|
|
||||||
{
|
|
||||||
SELECT pg_advisory_unlock(29279, 2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
session "s3"
|
session "s3"
|
||||||
|
|
||||||
step "s3-progress"
|
step "s3-lock-2-start"
|
||||||
{
|
{
|
||||||
set client_min_messages=NOTICE;
|
BEGIN;
|
||||||
|
DELETE FROM colocated1 WHERE test_id = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-unlock-2-start"
|
||||||
|
{
|
||||||
|
ROLLBACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s4"
|
||||||
|
|
||||||
|
step "s4-shard-move-sep-block-writes"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_move_shard_placement(1500009, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='block_writes');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s4-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s6"
|
||||||
|
|
||||||
|
// this advisory lock with (almost) random values are only used
|
||||||
|
// for testing purposes. For details, check Citus' logical replication
|
||||||
|
// source code
|
||||||
|
step "s6-acquire-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s6-release-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
session "s7"
|
||||||
|
|
||||||
|
// get_rebalance_progress internally calls pg_total_relation_size on all the
|
||||||
|
// shards. This means that it takes AccessShareLock on those shards. Because we
|
||||||
|
// run with deferred drop that means that get_rebalance_progress actually waits
|
||||||
|
// for the shard move to complete the drop. But we want to get the progress
|
||||||
|
// before the shards are dropped. So we grab the locks first with a simple
|
||||||
|
// query that reads from all shards. We force using a single connection because
|
||||||
|
// get_rebalance_progress isn't smart enough to reuse the right connection for
|
||||||
|
// the right shards and will simply use a single one for all of them.
|
||||||
|
step "s7-grab-lock"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_adaptive_executor_pool_size = 1;
|
||||||
|
SELECT 1 FROM colocated1 LIMIT 1;
|
||||||
|
SELECT 1 FROM separate LIMIT 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s7-get-progress"
|
||||||
|
{
|
||||||
|
set LOCAL client_min_messages=NOTICE;
|
||||||
SELECT
|
SELECT
|
||||||
table_name,
|
table_name,
|
||||||
shardid,
|
shardid,
|
||||||
|
@ -78,4 +160,39 @@ step "s3-progress"
|
||||||
FROM get_rebalance_progress();
|
FROM get_rebalance_progress();
|
||||||
}
|
}
|
||||||
|
|
||||||
permutation "s2-lock-1" "s2-lock-2" "s1-rebalance-c1" "s3-progress" "s2-unlock-1" "s3-progress" "s2-unlock-2" "s3-progress" "s1-commit" "s3-progress"
|
step "s7-release-lock"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s8"
|
||||||
|
|
||||||
|
// After running these tests we want to enable deferred-drop again. Sadly
|
||||||
|
// the isolation tester framework does not support multiple teardown steps
|
||||||
|
// and this cannot be run in a transaction. So we need to do it manually at
|
||||||
|
// the end of the last test.
|
||||||
|
step "enable-deferred-drop"
|
||||||
|
{
|
||||||
|
ALTER SYSTEM RESET citus.defer_drop_after_shard_move;
|
||||||
|
}
|
||||||
|
// blocking rebalancer does what it should
|
||||||
|
permutation "s2-lock-1-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s3-lock-2-start" "s1-rebalance-c1-block-writes" "s7-get-progress" "s3-unlock-2-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s7-grab-lock" "s1-rebalance-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
|
||||||
|
// online rebalancer
|
||||||
|
permutation "s6-acquire-advisory-lock" "s1-rebalance-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
|
||||||
|
// blocking shard move
|
||||||
|
permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
|
||||||
|
// online shard move
|
||||||
|
permutation "s6-acquire-advisory-lock" "s1-shard-move-c1-online" "s7-get-progress" "s6-release-advisory-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s7-grab-lock" "s1-shard-move-c1-online" "s7-get-progress" "s7-release-lock" "s1-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
|
||||||
|
|
||||||
|
// parallel blocking shard move
|
||||||
|
permutation "s2-lock-1-start" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s2-unlock-1-start" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
permutation "s7-grab-lock" "s1-shard-move-c1-block-writes" "s4-shard-move-sep-block-writes" "s7-get-progress" "s7-release-lock" "s1-commit" "s4-commit" "s7-get-progress" "enable-deferred-drop"
|
||||||
|
|
Loading…
Reference in New Issue