Feature: run rebalancer in the background (#6215)

DESCRIPTION: Add a rebalancer that uses background tasks for its
execution

Based on the baclground jobs and tasks introduced in #6296 we implement
a new rebalancer on top of the primitives of background execution. This
allows the user to initiate a rebalance and let Citus execute the long
running steps in the background until completion.

Users can invoke the new background rebalancer with `SELECT
citus_rebalance_start();`. It will output information on its job id and
how to track progress. Also it returns its job id for automation
purposes. If you simply want to wait till the rebalance is done you can
use `SELECT citus_rebalance_wait();`

A running rebalance can be canelled/stopped with `SELECT
citus_rebalance_stop();`.
pull/6324/head
Nils Dijk 2022-09-12 19:46:53 +02:00 committed by GitHub
parent 48f7d6c279
commit cda3686d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 774 additions and 13 deletions

View File

@ -2724,6 +2724,79 @@ GetNextBackgroundTaskTaskId(void)
}
/*
* HasNonTerminalJobOfType returns true if there is a job of a given type that is not in
* its terminal state.
*
* Some jobs would want a single instance to be able to run at once. Before submitting a
* new job if could see if there is a job of their type already executing.
*
* If a job is found the options jobIdOut is populated with the jobId.
*/
bool
HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut)
{
Relation pgDistBackgroundJob =
table_open(DistBackgroundJobRelationId(), AccessShareLock);
/* find any job in states listed here */
BackgroundJobStatus jobStatus[] = {
BACKGROUND_JOB_STATUS_RUNNING,
BACKGROUND_JOB_STATUS_CANCELLING,
BACKGROUND_JOB_STATUS_FAILING,
BACKGROUND_JOB_STATUS_SCHEDULED
};
NameData jobTypeName = { 0 };
namestrcpy(&jobTypeName, jobType);
bool foundJob = false;
for (int i = 0; !foundJob && i < lengthof(jobStatus); i++)
{
ScanKeyData scanKey[2] = { 0 };
const bool indexOK = true;
/* pg_dist_background_job.status == jobStatus[i] */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_job_state,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(BackgroundJobStatusOid(jobStatus[i])));
/* pg_dist_background_job.job_type == jobType */
ScanKeyInit(&scanKey[1], Anum_pg_dist_background_job_job_type,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(&jobTypeName));
SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundJob,
InvalidOid, /* TODO use an actual index here */
indexOK, NULL, lengthof(scanKey), scanKey);
HeapTuple taskTuple = NULL;
if (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
foundJob = true;
if (jobIdOut)
{
Datum values[Natts_pg_dist_background_job] = { 0 };
bool isnull[Natts_pg_dist_background_job] = { 0 };
TupleDesc tupleDesc = RelationGetDescr(pgDistBackgroundJob);
heap_deform_tuple(taskTuple, tupleDesc, values, isnull);
*jobIdOut = DatumGetInt64(values[Anum_pg_dist_background_job_job_id - 1]);
}
}
systable_endscan(scanDescriptor);
}
table_close(pgDistBackgroundJob, NoLock);
return foundJob;
}
/*
* CreateBackgroundJob is a helper function to insert a new Background Job into Citus'
* catalog. After inserting the new job's metadataa into the catalog it returns the job_id

View File

@ -26,6 +26,7 @@
#include "commands/dbcommands.h"
#include "commands/sequence.h"
#include "distributed/argutils.h"
#include "distributed/background_jobs.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
@ -73,6 +74,7 @@ typedef struct RebalanceOptions
bool drainOnly;
float4 improvementThreshold;
Form_pg_dist_rebalance_strategy rebalanceStrategy;
const char *operationName;
} RebalanceOptions;
@ -227,6 +229,8 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context);
static ShardCost GetShardCost(uint64 shardId, void *context);
static List * NonColocatedDistRelationIdList(void);
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid
shardReplicationModeOid);
static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
shardReplicationModeOid, char *noticeOperation);
@ -245,6 +249,8 @@ static uint64 WorkerShardSize(HTAB *workerShardStatistics,
static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort,
uint64 shardId);
static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics);
static void ErrorOnConcurrentRebalance(RebalanceOptions *);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(rebalance_table_shards);
@ -256,6 +262,9 @@ PG_FUNCTION_INFO_V1(master_drain_node);
PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size);
PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions);
PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
PG_FUNCTION_INFO_V1(citus_rebalance_start);
PG_FUNCTION_INFO_V1(citus_rebalance_stop);
PG_FUNCTION_INFO_V1(citus_rebalance_wait);
bool RunningUnderIsolationTest = false;
int MaxRebalancerLoggedIgnoredMoves = 5;
@ -858,6 +867,93 @@ rebalance_table_shards(PG_FUNCTION_ARGS)
}
/*
* citus_rebalance_start rebalances the shards across the workers.
*
* SQL signature:
*
* citus_rebalance_start(
* rebalance_strategy name DEFAULT NULL,
* drain_only boolean DEFAULT false,
* shard_transfer_mode citus.shard_transfer_mode default 'auto'
* ) RETURNS VOID
*/
Datum
citus_rebalance_start(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
List *relationIdList = NonColocatedDistRelationIdList();
Form_pg_dist_rebalance_strategy strategy =
GetRebalanceStrategy(PG_GETARG_NAME_OR_NULL(0));
PG_ENSURE_ARGNOTNULL(1, "drain_only");
bool drainOnly = PG_GETARG_BOOL(1);
PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode");
Oid shardTransferModeOid = PG_GETARG_OID(2);
RebalanceOptions options = {
.relationIdList = relationIdList,
.threshold = strategy->defaultThreshold,
.maxShardMoves = 10000000,
.excludedShardArray = construct_empty_array(INT4OID),
.drainOnly = drainOnly,
.rebalanceStrategy = strategy,
.improvementThreshold = strategy->improvementThreshold,
};
int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid);
if (jobId == 0)
{
PG_RETURN_NULL();
}
PG_RETURN_INT64(jobId);
}
/*
* citus_rebalance_stop stops any ongoing background rebalance that is executing.
* Raises an error when there is no backgound rebalance ongoing at the moment.
*/
Datum
citus_rebalance_stop(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int64 jobId = 0;
if (!HasNonTerminalJobOfType("rebalance", &jobId))
{
ereport(ERROR, (errmsg("no ongoing rebalance that can be stopped")));
}
DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId));
PG_RETURN_VOID();
}
/*
* citus_rebalance_wait waits till an ongoing background rebalance has finished execution.
* A warning will be displayed if no rebalance is ongoing.
*/
Datum
citus_rebalance_wait(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int64 jobId = 0;
if (!HasNonTerminalJobOfType("rebalance", &jobId))
{
ereport(WARNING, (errmsg("no ongoing rebalance that can be waited on")));
PG_RETURN_VOID();
}
citus_job_wait_internal(jobId, NULL);
PG_RETURN_VOID();
}
/*
* GetRebalanceStrategy returns the rebalance strategy from
* pg_dist_rebalance_strategy matching the given name. If name is NULL it
@ -1579,17 +1675,14 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
return;
}
Oid relationId = InvalidOid;
char *operationName = "rebalance";
if (options->drainOnly)
{
operationName = "move";
}
foreach_oid(relationId, options->relationIdList)
{
AcquireRebalanceColocationLock(relationId, operationName);
}
options->operationName = operationName;
ErrorOnConcurrentRebalance(options);
List *placementUpdateList = GetRebalanceSteps(options);
@ -1609,6 +1702,168 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
}
/*
* ErrorOnConcurrentRebalance raises an error with extra information when there is already
* a rebalance running.
*/
static void
ErrorOnConcurrentRebalance(RebalanceOptions *options)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
{
/* this provides the legacy error when the lock can't be acquired */
AcquireRebalanceColocationLock(relationId, options->operationName);
}
int64 jobId = 0;
if (HasNonTerminalJobOfType("rebalance", &jobId))
{
ereport(ERROR, (
errmsg("A rebalance is already running as job %ld", jobId),
errdetail("A rebalance was already scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM "
"pg_dist_background_task WHERE job_id = %ld ORDER BY task_id "
"ASC; or SELECT * FROM get_rebalance_progress();",
jobId)));
}
}
/*
* RebalanceTableShardsBackground rebalances the shards for the relations
* inside the relationIdList across the different workers. It does so using our
* background job+task infrastructure.
*/
static int64
RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid)
{
if (list_length(options->relationIdList) == 0)
{
ereport(NOTICE, (errmsg("No tables to rebalance")));
return 0;
}
char *operationName = "rebalance";
if (options->drainOnly)
{
operationName = "move";
}
options->operationName = operationName;
ErrorOnConcurrentRebalance(options);
const char shardTransferMode = LookupShardTransferMode(shardReplicationModeOid);
List *colocatedTableList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
{
colocatedTableList = list_concat(colocatedTableList,
ColocatedTableList(relationId));
}
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
{
EnsureTableOwner(colocatedTableId);
}
if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
/* make sure that all tables included in the rebalance have a replica identity*/
VerifyTablesHaveReplicaIdentity(colocatedTableList);
}
List *placementUpdateList = GetRebalanceSteps(options);
if (list_length(placementUpdateList) == 0)
{
ereport(NOTICE, (errmsg("No moves available for rebalancing")));
return 0;
}
DropOrphanedShardsInSeparateTransaction();
/* find the name of the shard transfer mode to interpolate in the scheduled command */
Datum shardTranferModeLabelDatum =
DirectFunctionCall1(enum_out, shardReplicationModeOid);
char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum);
/* schedule planned moves */
int64 jobId = CreateBackgroundJob("rebalance", "Rebalance all colocation groups");
/* buffer used to construct the sql command for the tasks */
StringInfoData buf = { 0 };
initStringInfo(&buf);
/*
* Currently we only have two tasks that any move can depend on:
* - replicating reference tables
* - the previous move
*
* prevJobIdx tells what slot to write the id of the task into. We only use both slots
* if we are actually replicating reference tables.
*/
int64 prevJobId[2] = { 0 };
int prevJobIdx = 0;
List *referenceTableIdList = NIL;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
VerifyTablesHaveReplicaIdentity(referenceTableIdList);
/*
* Reference tables need to be copied to (newly-added) nodes, this needs to be the
* first task before we can move any other table.
*/
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevJobIdx] = task->taskid;
prevJobIdx++;
}
PlacementUpdateEvent *move = NULL;
bool first = true;
int prevMoveIndex = prevJobIdx;
foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"SELECT pg_catalog.citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)",
move->shardId,
quote_literal_cstr(move->sourceNode->workerName),
move->sourceNode->workerPort,
quote_literal_cstr(move->targetNode->workerName),
move->targetNode->workerPort,
quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
prevJobIdx, prevJobId);
prevJobId[prevMoveIndex] = task->taskid;
if (first)
{
first = false;
prevJobIdx++;
}
}
ereport(NOTICE,
(errmsg("Scheduled %d moves as job %ld",
list_length(placementUpdateList), jobId),
errdetail("Rebalance scheduled as background job"),
errhint("To monitor progress, run: "
"SELECT * FROM pg_dist_background_task WHERE job_id = %ld ORDER BY "
"task_id ASC; or SELECT * FROM get_rebalance_progress();",
jobId)));
return jobId;
}
/*
* UpdateShardPlacement copies or moves a shard placement by calling
* the corresponding functions in Citus in a subtransaction.

View File

@ -170,3 +170,6 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC;
#include "udfs/citus_job_wait/11.1-1.sql"
#include "udfs/citus_job_cancel/11.1-1.sql"
#include "udfs/citus_rebalance_start/11.1-1.sql"
#include "udfs/citus_rebalance_stop/11.1-1.sql"
#include "udfs/citus_rebalance_wait/11.1-1.sql"

View File

@ -108,6 +108,10 @@ DROP TABLE pg_catalog.pg_dist_cleanup;
DROP SEQUENCE pg_catalog.pg_dist_operationid_seq;
DROP SEQUENCE pg_catalog.pg_dist_cleanup_recordid_seq;
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_resources();
DROP FUNCTION pg_catalog.citus_rebalance_start(name, bool, citus.shard_transfer_mode);
DROP FUNCTION pg_catalog.citus_rebalance_stop();
DROP FUNCTION pg_catalog.citus_rebalance_wait();
DROP FUNCTION pg_catalog.citus_job_cancel(bigint);
DROP FUNCTION pg_catalog.citus_job_wait(bigint, pg_catalog.citus_job_status);
DROP TABLE pg_catalog.pg_dist_background_task_depend;

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,
shard_transfer_mode citus.shard_transfer_mode default 'auto'
)
RETURNS bigint
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode)
IS 'rebalance the shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC;

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,
shard_transfer_mode citus.shard_transfer_mode default 'auto'
)
RETURNS bigint
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode)
IS 'rebalance the shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_stop()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_stop()
IS 'stop a rebalance that is running in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_stop() TO PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_stop()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_stop()
IS 'stop a rebalance that is running in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_stop() TO PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_wait()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_wait()
IS 'wait on a running rebalance in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_wait() TO PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_wait()
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_wait()
IS 'wait on a running rebalance in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_wait() TO PUBLIC;

View File

@ -157,6 +157,22 @@ citus_job_wait(PG_FUNCTION_ARGS)
desiredStatus = BackgroundJobStatusByOid(PG_GETARG_OID(1));
}
citus_job_wait_internal(jobid, hasDesiredStatus ? &desiredStatus : NULL);
PG_RETURN_VOID();
}
/*
* citus_job_wait_internal imaplements the waiting on a job for reuse in other areas where
* we want to wait on jobs. eg the background rebalancer.
*
* When a desiredStatus is provided it will provide an error when a different state is
* reached and the state cannot ever reach the desired state anymore.
*/
void
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
{
/*
* Since we are wait polling we will actually allocate memory on every poll. To make
* sure we don't put unneeded pressure on the memory we create a context that we clear
@ -177,10 +193,9 @@ citus_job_wait(PG_FUNCTION_ARGS)
if (!job)
{
ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid)));
PG_RETURN_VOID();
}
if (hasDesiredStatus && job->state == desiredStatus)
if (desiredStatus && job->state == *desiredStatus)
{
/* job has reached its desired status, done waiting */
break;
@ -188,7 +203,7 @@ citus_job_wait(PG_FUNCTION_ARGS)
if (IsBackgroundJobStatusTerminal(job->state))
{
if (hasDesiredStatus)
if (desiredStatus)
{
/*
* We have reached a terminal state, which is not the desired state we
@ -201,7 +216,7 @@ citus_job_wait(PG_FUNCTION_ARGS)
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
Oid desiredStatusOid = BackgroundJobStatusOid(desiredStatus);
Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
@ -228,8 +243,6 @@ citus_job_wait(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
PG_RETURN_VOID();
}

View File

@ -293,6 +293,63 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
}
/*
* HasNodesWithMissingReferenceTables checks if all reference tables are already copied to
* all nodes. When a node doesn't have a copy of the reference tables we call them missing
* and this function will return true.
*
* The caller might be interested in the list of all reference tables after this check and
* this the list of tables is written to *referenceTableList if a non-null pointer is
* passed.
*/
bool
HasNodesWithMissingReferenceTables(List **referenceTableList)
{
int colocationId = GetReferenceTableColocationId();
if (colocationId == INVALID_COLOCATION_ID)
{
/* we have no reference table yet. */
return false;
}
LockColocationId(colocationId, AccessShareLock);
List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableList)
{
*referenceTableList = referenceTableIdList;
}
if (list_length(referenceTableIdList) <= 0)
{
return false;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
const char *referenceTableName = get_rel_name(referenceTableId);
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
AccessShareLock);
if (list_length(newWorkersList) <= 0)
{
return false;
}
return true;
}
/*
* AnyRelationsModifiedInTransaction returns true if any of the given relations
* were modified in the current transaction.

View File

@ -15,9 +15,15 @@
#include "postmaster/bgworker.h"
#include "distributed/metadata_utility.h"
extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database,
Oid extensionOwner);
extern void CitusBackgroundTaskQueueMonitorMain(Datum arg);
extern void CitusBackgroundTaskExecuter(Datum main_arg);
extern Datum citus_job_cancel(PG_FUNCTION_ARGS);
extern Datum citus_job_wait(PG_FUNCTION_ARGS);
extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus);
#endif /*CITUS_BACKGROUND_JOBS_H */

View File

@ -384,6 +384,7 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasRunnableBackgroundTask(void);
extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut);
extern int64 CreateBackgroundJob(const char *jobType, const char *description);
extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command,
int dependingTaskCount,

View File

@ -20,6 +20,7 @@
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern bool HasNodesWithMissingReferenceTables(List **referenceTableList);
extern uint32 CreateReferenceTableColocationId(void);
extern uint32 GetReferenceTableColocationId(void);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,

View File

@ -283,3 +283,8 @@ s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg
# will be replaced with
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g
# remove jobId's from the messages of the background rebalancer
s/^ERROR: A rebalance is already running as job [0-9]+$/ERROR: A rebalance is already running as job xxx/g
s/^NOTICE: Scheduled ([0-9]+) moves as job [0-9]+$/NOTICE: Scheduled \1 moves as job xxx/g
s/^HINT: (.*) job_id = [0-9]+ (.*)$/HINT: \1 job_id = xxx \2/g

View File

@ -0,0 +1,180 @@
CREATE SCHEMA background_rebalance;
SET search_path TO background_rebalance;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE t1 (a int PRIMARY KEY);
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- verify the rebalance works - no-op - when the shards are balanced. Noop is shown by wait complaining there is nothing
-- to wait on.
SELECT 1 FROM citus_rebalance_start();
NOTICE: No moves available for rebalancing
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_wait();
WARNING: no ongoing rebalance that can be waited on
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- rebalance a table in the background
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
CREATE TABLE t2 (a int);
SELECT create_distributed_table('t2', 'a' , colocate_with => 't1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- show that we get an error when a table in the colocation group can't be moved non-blocking
SELECT 1 FROM citus_rebalance_start();
ERROR: cannot use logical replication to transfer shards of the relation t2 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
SELECT 1 FROM citus_rebalance_start(shard_transfer_mode => 'block_writes');
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
DROP TABLE t2;
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- show we can stop a rebalance, the stop causes the move to not have happened, eg, our move back below fails.
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_stop();
citus_rebalance_stop
---------------------------------------------------------------------
(1 row)
-- waiting on this rebalance is racy, as it sometimes sees no rebalance is ongoing while other times it actually sees it ongoing
-- we simply sleep a bit here
SELECT pg_sleep(1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- failing move due to a stopped rebalance, first clean orphans to make the error stable
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_shards();
RESET client_min_messages;
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
WARNING: shard is already present on node localhost:xxxxx
DETAIL: Move may have already completed.
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- show we can't start the rebalancer twice
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_rebalance_start();
ERROR: A rebalance is already running as job xxx
DETAIL: A rebalance was already scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- show that the old rebalancer cannot be started with a background rebalance in progress
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_rebalance_start();
NOTICE: Scheduled 1 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT rebalance_table_shards();
ERROR: A rebalance is already running as job xxx
DETAIL: A rebalance was already scheduled as background job
HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress();
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE;

View File

@ -575,3 +575,31 @@ master_set_node_property
(1 row)
starting permutation: s1-rebalance-all s2-citus-rebalance-start s1-commit
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-rebalance-all:
BEGIN;
select rebalance_table_shards();
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
step s2-citus-rebalance-start:
SELECT 1 FROM citus_rebalance_start();
ERROR: could not acquire the lock required to rebalance public.colocated1
step s1-commit:
COMMIT;
master_set_node_property
---------------------------------------------------------------------
(1 row)

View File

@ -1143,6 +1143,9 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_job_cancel(bigint) void
| function citus_job_wait(bigint,citus_job_status) void
| function citus_locks() SETOF record
| function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint
| function citus_rebalance_stop() void
| function citus_rebalance_wait() void
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
| function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void
| function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint
@ -1165,7 +1168,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| type split_copy_info
| type split_shard_info
| view citus_locks
(52 rows)
(55 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -90,6 +90,9 @@ ORDER BY 1;
function citus_pid_for_gpid(bigint)
function citus_prepare_pg_upgrade()
function citus_query_stats()
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode)
function citus_rebalance_stop()
function citus_rebalance_wait()
function citus_relation_size(regclass)
function citus_remote_connection_stats()
function citus_remove_node(text,integer)
@ -283,5 +286,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(275 rows)
(278 rows)

View File

@ -3,6 +3,7 @@ test: multi_cluster_management
test: multi_test_catalog_views
test: shard_rebalancer_unit
test: shard_rebalancer
test: background_rebalance
test: worker_copy_table_to_node
test: foreign_key_to_reference_shard_rebalance
test: multi_move_mx

View File

@ -44,6 +44,12 @@ step "s1-replicate-nc"
select replicate_table_shards('non_colocated');
}
step "s1-rebalance-all"
{
BEGIN;
select rebalance_table_shards();
}
step "s1-commit"
{
COMMIT;
@ -82,6 +88,11 @@ step "s2-drain"
select master_drain_node('localhost', 57638);
}
step "s2-citus-rebalance-start"
{
SELECT 1 FROM citus_rebalance_start();
}
// disallowed because it's the same table
permutation "s1-rebalance-nc" "s2-rebalance-nc" "s1-commit"
@ -112,3 +123,6 @@ permutation "s1-rebalance-c1" "s2-drain" "s1-commit"
permutation "s1-replicate-c1" "s2-drain" "s1-commit"
permutation "s1-rebalance-nc" "s2-drain" "s1-commit"
permutation "s1-replicate-nc" "s2-drain" "s1-commit"
// disallow the background rebalancer to run when rebalance_table_shard rung
permutation "s1-rebalance-all" "s2-citus-rebalance-start" "s1-commit"

View File

@ -0,0 +1,64 @@
CREATE SCHEMA background_rebalance;
SET search_path TO background_rebalance;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
CREATE TABLE t1 (a int PRIMARY KEY);
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
-- verify the rebalance works - no-op - when the shards are balanced. Noop is shown by wait complaining there is nothing
-- to wait on.
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
-- rebalance a table in the background
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
CREATE TABLE t2 (a int);
SELECT create_distributed_table('t2', 'a' , colocate_with => 't1');
-- show that we get an error when a table in the colocation group can't be moved non-blocking
SELECT 1 FROM citus_rebalance_start();
SELECT 1 FROM citus_rebalance_start(shard_transfer_mode => 'block_writes');
SELECT citus_rebalance_wait();
DROP TABLE t2;
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
-- show we can stop a rebalance, the stop causes the move to not have happened, eg, our move back below fails.
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_stop();
-- waiting on this rebalance is racy, as it sometimes sees no rebalance is ongoing while other times it actually sees it ongoing
-- we simply sleep a bit here
SELECT pg_sleep(1);
-- failing move due to a stopped rebalance, first clean orphans to make the error stable
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_shards();
RESET client_min_messages;
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
-- show we can't start the rebalancer twice
SELECT 1 FROM citus_rebalance_start();
SELECT 1 FROM citus_rebalance_start();
SELECT citus_rebalance_wait();
-- show that the old rebalancer cannot be started with a background rebalance in progress
SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes');
SELECT 1 FROM citus_rebalance_start();
SELECT rebalance_table_shards();
SELECT citus_rebalance_wait();
SET client_min_messages TO WARNING;
DROP SCHEMA background_rebalance CASCADE;