diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 8c57357c9..b7d20a9b5 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2724,6 +2724,79 @@ GetNextBackgroundTaskTaskId(void) } +/* + * HasNonTerminalJobOfType returns true if there is a job of a given type that is not in + * its terminal state. + * + * Some jobs would want a single instance to be able to run at once. Before submitting a + * new job if could see if there is a job of their type already executing. + * + * If a job is found the options jobIdOut is populated with the jobId. + */ +bool +HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut) +{ + Relation pgDistBackgroundJob = + table_open(DistBackgroundJobRelationId(), AccessShareLock); + + /* find any job in states listed here */ + BackgroundJobStatus jobStatus[] = { + BACKGROUND_JOB_STATUS_RUNNING, + BACKGROUND_JOB_STATUS_CANCELLING, + BACKGROUND_JOB_STATUS_FAILING, + BACKGROUND_JOB_STATUS_SCHEDULED + }; + + NameData jobTypeName = { 0 }; + namestrcpy(&jobTypeName, jobType); + + bool foundJob = false; + for (int i = 0; !foundJob && i < lengthof(jobStatus); i++) + { + ScanKeyData scanKey[2] = { 0 }; + const bool indexOK = true; + + /* pg_dist_background_job.status == jobStatus[i] */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_job_state, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(BackgroundJobStatusOid(jobStatus[i]))); + + /* pg_dist_background_job.job_type == jobType */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_job_job_type, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(&jobTypeName)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistBackgroundJob, + InvalidOid, /* TODO use an actual index here */ + indexOK, NULL, lengthof(scanKey), scanKey); + + HeapTuple taskTuple = NULL; + if (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) + { + foundJob = true; + + if (jobIdOut) + { + Datum values[Natts_pg_dist_background_job] = { 0 }; + bool isnull[Natts_pg_dist_background_job] = { 0 }; + + TupleDesc tupleDesc = RelationGetDescr(pgDistBackgroundJob); + heap_deform_tuple(taskTuple, tupleDesc, values, isnull); + + *jobIdOut = DatumGetInt64(values[Anum_pg_dist_background_job_job_id - 1]); + } + } + + systable_endscan(scanDescriptor); + } + + table_close(pgDistBackgroundJob, NoLock); + + return foundJob; +} + + /* * CreateBackgroundJob is a helper function to insert a new Background Job into Citus' * catalog. After inserting the new job's metadataa into the catalog it returns the job_id diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 0e9b8db1d..60962deac 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -26,6 +26,7 @@ #include "commands/dbcommands.h" #include "commands/sequence.h" #include "distributed/argutils.h" +#include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" @@ -73,6 +74,7 @@ typedef struct RebalanceOptions bool drainOnly; float4 improvementThreshold; Form_pg_dist_rebalance_strategy rebalanceStrategy; + const char *operationName; } RebalanceOptions; @@ -227,6 +229,8 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context); static ShardCost GetShardCost(uint64 shardId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); +static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid + shardReplicationModeOid); static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); @@ -245,6 +249,8 @@ static uint64 WorkerShardSize(HTAB *workerShardStatistics, static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int workerPort, uint64 shardId); static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics); +static void ErrorOnConcurrentRebalance(RebalanceOptions *); + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(rebalance_table_shards); @@ -256,6 +262,9 @@ PG_FUNCTION_INFO_V1(master_drain_node); PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size); PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions); PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); +PG_FUNCTION_INFO_V1(citus_rebalance_start); +PG_FUNCTION_INFO_V1(citus_rebalance_stop); +PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; @@ -858,6 +867,93 @@ rebalance_table_shards(PG_FUNCTION_ARGS) } +/* + * citus_rebalance_start rebalances the shards across the workers. + * + * SQL signature: + * + * citus_rebalance_start( + * rebalance_strategy name DEFAULT NULL, + * drain_only boolean DEFAULT false, + * shard_transfer_mode citus.shard_transfer_mode default 'auto' + * ) RETURNS VOID + */ +Datum +citus_rebalance_start(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + List *relationIdList = NonColocatedDistRelationIdList(); + Form_pg_dist_rebalance_strategy strategy = + GetRebalanceStrategy(PG_GETARG_NAME_OR_NULL(0)); + + PG_ENSURE_ARGNOTNULL(1, "drain_only"); + bool drainOnly = PG_GETARG_BOOL(1); + + PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); + Oid shardTransferModeOid = PG_GETARG_OID(2); + + RebalanceOptions options = { + .relationIdList = relationIdList, + .threshold = strategy->defaultThreshold, + .maxShardMoves = 10000000, + .excludedShardArray = construct_empty_array(INT4OID), + .drainOnly = drainOnly, + .rebalanceStrategy = strategy, + .improvementThreshold = strategy->improvementThreshold, + }; + int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid); + + if (jobId == 0) + { + PG_RETURN_NULL(); + } + PG_RETURN_INT64(jobId); +} + + +/* + * citus_rebalance_stop stops any ongoing background rebalance that is executing. + * Raises an error when there is no backgound rebalance ongoing at the moment. + */ +Datum +citus_rebalance_stop(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int64 jobId = 0; + if (!HasNonTerminalJobOfType("rebalance", &jobId)) + { + ereport(ERROR, (errmsg("no ongoing rebalance that can be stopped"))); + } + + DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId)); + + PG_RETURN_VOID(); +} + + +/* + * citus_rebalance_wait waits till an ongoing background rebalance has finished execution. + * A warning will be displayed if no rebalance is ongoing. + */ +Datum +citus_rebalance_wait(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + int64 jobId = 0; + if (!HasNonTerminalJobOfType("rebalance", &jobId)) + { + ereport(WARNING, (errmsg("no ongoing rebalance that can be waited on"))); + PG_RETURN_VOID(); + } + + citus_job_wait_internal(jobId, NULL); + + PG_RETURN_VOID(); +} + + /* * GetRebalanceStrategy returns the rebalance strategy from * pg_dist_rebalance_strategy matching the given name. If name is NULL it @@ -1579,17 +1675,14 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) return; } - Oid relationId = InvalidOid; char *operationName = "rebalance"; if (options->drainOnly) { operationName = "move"; } - foreach_oid(relationId, options->relationIdList) - { - AcquireRebalanceColocationLock(relationId, operationName); - } + options->operationName = operationName; + ErrorOnConcurrentRebalance(options); List *placementUpdateList = GetRebalanceSteps(options); @@ -1609,6 +1702,168 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) } +/* + * ErrorOnConcurrentRebalance raises an error with extra information when there is already + * a rebalance running. + */ +static void +ErrorOnConcurrentRebalance(RebalanceOptions *options) +{ + Oid relationId = InvalidOid; + foreach_oid(relationId, options->relationIdList) + { + /* this provides the legacy error when the lock can't be acquired */ + AcquireRebalanceColocationLock(relationId, options->operationName); + } + + int64 jobId = 0; + if (HasNonTerminalJobOfType("rebalance", &jobId)) + { + ereport(ERROR, ( + errmsg("A rebalance is already running as job %ld", jobId), + errdetail("A rebalance was already scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "pg_dist_background_task WHERE job_id = %ld ORDER BY task_id " + "ASC; or SELECT * FROM get_rebalance_progress();", + jobId))); + } +} + + +/* + * RebalanceTableShardsBackground rebalances the shards for the relations + * inside the relationIdList across the different workers. It does so using our + * background job+task infrastructure. + */ +static int64 +RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid) +{ + if (list_length(options->relationIdList) == 0) + { + ereport(NOTICE, (errmsg("No tables to rebalance"))); + return 0; + } + + char *operationName = "rebalance"; + if (options->drainOnly) + { + operationName = "move"; + } + + options->operationName = operationName; + ErrorOnConcurrentRebalance(options); + + const char shardTransferMode = LookupShardTransferMode(shardReplicationModeOid); + List *colocatedTableList = NIL; + Oid relationId = InvalidOid; + foreach_oid(relationId, options->relationIdList) + { + colocatedTableList = list_concat(colocatedTableList, + ColocatedTableList(relationId)); + } + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + EnsureTableOwner(colocatedTableId); + } + + if (shardTransferMode == TRANSFER_MODE_AUTOMATIC) + { + /* make sure that all tables included in the rebalance have a replica identity*/ + VerifyTablesHaveReplicaIdentity(colocatedTableList); + } + + List *placementUpdateList = GetRebalanceSteps(options); + + if (list_length(placementUpdateList) == 0) + { + ereport(NOTICE, (errmsg("No moves available for rebalancing"))); + return 0; + } + + DropOrphanedShardsInSeparateTransaction(); + + /* find the name of the shard transfer mode to interpolate in the scheduled command */ + Datum shardTranferModeLabelDatum = + DirectFunctionCall1(enum_out, shardReplicationModeOid); + char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum); + + /* schedule planned moves */ + int64 jobId = CreateBackgroundJob("rebalance", "Rebalance all colocation groups"); + + /* buffer used to construct the sql command for the tasks */ + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + /* + * Currently we only have two tasks that any move can depend on: + * - replicating reference tables + * - the previous move + * + * prevJobIdx tells what slot to write the id of the task into. We only use both slots + * if we are actually replicating reference tables. + */ + int64 prevJobId[2] = { 0 }; + int prevJobIdx = 0; + + List *referenceTableIdList = NIL; + + if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) + { + VerifyTablesHaveReplicaIdentity(referenceTableIdList); + + /* + * Reference tables need to be copied to (newly-added) nodes, this needs to be the + * first task before we can move any other table. + */ + appendStringInfo(&buf, + "SELECT pg_catalog.replicate_reference_tables(%s)", + quote_literal_cstr(shardTranferModeLabel)); + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, + prevJobIdx, prevJobId); + prevJobId[prevJobIdx] = task->taskid; + prevJobIdx++; + } + + PlacementUpdateEvent *move = NULL; + bool first = true; + int prevMoveIndex = prevJobIdx; + foreach_ptr(move, placementUpdateList) + { + resetStringInfo(&buf); + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)", + move->shardId, + quote_literal_cstr(move->sourceNode->workerName), + move->sourceNode->workerPort, + quote_literal_cstr(move->targetNode->workerName), + move->targetNode->workerPort, + quote_literal_cstr(shardTranferModeLabel)); + + BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, + prevJobIdx, prevJobId); + prevJobId[prevMoveIndex] = task->taskid; + if (first) + { + first = false; + prevJobIdx++; + } + } + + ereport(NOTICE, + (errmsg("Scheduled %d moves as job %ld", + list_length(placementUpdateList), jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: " + "SELECT * FROM pg_dist_background_task WHERE job_id = %ld ORDER BY " + "task_id ASC; or SELECT * FROM get_rebalance_progress();", + jobId))); + + return jobId; +} + + /* * UpdateShardPlacement copies or moves a shard placement by calling * the corresponding functions in Citus in a subtransaction. diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index d5e157fdb..00c28f22c 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -170,3 +170,6 @@ GRANT SELECT ON pg_catalog.pg_dist_background_task_depend TO PUBLIC; #include "udfs/citus_job_wait/11.1-1.sql" #include "udfs/citus_job_cancel/11.1-1.sql" +#include "udfs/citus_rebalance_start/11.1-1.sql" +#include "udfs/citus_rebalance_stop/11.1-1.sql" +#include "udfs/citus_rebalance_wait/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index fbfc4eddf..2a7462e0d 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -108,6 +108,10 @@ DROP TABLE pg_catalog.pg_dist_cleanup; DROP SEQUENCE pg_catalog.pg_dist_operationid_seq; DROP SEQUENCE pg_catalog.pg_dist_cleanup_recordid_seq; DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_resources(); + +DROP FUNCTION pg_catalog.citus_rebalance_start(name, bool, citus.shard_transfer_mode); +DROP FUNCTION pg_catalog.citus_rebalance_stop(); +DROP FUNCTION pg_catalog.citus_rebalance_wait(); DROP FUNCTION pg_catalog.citus_job_cancel(bigint); DROP FUNCTION pg_catalog.citus_job_wait(bigint, pg_catalog.citus_job_status); DROP TABLE pg_catalog.pg_dist_background_task_depend; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql new file mode 100644 index 000000000..cc84d3142 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( + rebalance_strategy name DEFAULT NULL, + drain_only boolean DEFAULT false, + shard_transfer_mode citus.shard_transfer_mode default 'auto' + ) + RETURNS bigint + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) + IS 'rebalance the shards in the cluster in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql new file mode 100644 index 000000000..cc84d3142 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( + rebalance_strategy name DEFAULT NULL, + drain_only boolean DEFAULT false, + shard_transfer_mode citus.shard_transfer_mode default 'auto' + ) + RETURNS bigint + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) + IS 'rebalance the shards in the cluster in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_stop/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_stop/11.1-1.sql new file mode 100644 index 000000000..46ef49996 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_stop/11.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_stop() + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_stop() + IS 'stop a rebalance that is running in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_stop() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_stop/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_stop/latest.sql new file mode 100644 index 000000000..46ef49996 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_stop/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_stop() + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_stop() + IS 'stop a rebalance that is running in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_stop() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_wait/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_wait/11.1-1.sql new file mode 100644 index 000000000..4e78ec621 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_wait/11.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_wait() + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_wait() + IS 'wait on a running rebalance in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_wait() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_wait/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_wait/latest.sql new file mode 100644 index 000000000..4e78ec621 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_wait/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_wait() + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_wait() + IS 'wait on a running rebalance in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_wait() TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index f28bef486..6b7cb024b 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -157,6 +157,22 @@ citus_job_wait(PG_FUNCTION_ARGS) desiredStatus = BackgroundJobStatusByOid(PG_GETARG_OID(1)); } + citus_job_wait_internal(jobid, hasDesiredStatus ? &desiredStatus : NULL); + + PG_RETURN_VOID(); +} + + +/* + * citus_job_wait_internal imaplements the waiting on a job for reuse in other areas where + * we want to wait on jobs. eg the background rebalancer. + * + * When a desiredStatus is provided it will provide an error when a different state is + * reached and the state cannot ever reach the desired state anymore. + */ +void +citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus) +{ /* * Since we are wait polling we will actually allocate memory on every poll. To make * sure we don't put unneeded pressure on the memory we create a context that we clear @@ -177,10 +193,9 @@ citus_job_wait(PG_FUNCTION_ARGS) if (!job) { ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid))); - PG_RETURN_VOID(); } - if (hasDesiredStatus && job->state == desiredStatus) + if (desiredStatus && job->state == *desiredStatus) { /* job has reached its desired status, done waiting */ break; @@ -188,7 +203,7 @@ citus_job_wait(PG_FUNCTION_ARGS) if (IsBackgroundJobStatusTerminal(job->state)) { - if (hasDesiredStatus) + if (desiredStatus) { /* * We have reached a terminal state, which is not the desired state we @@ -201,7 +216,7 @@ citus_job_wait(PG_FUNCTION_ARGS) reachedStatusOid); char *reachedStatusName = DatumGetCString(reachedStatusNameDatum); - Oid desiredStatusOid = BackgroundJobStatusOid(desiredStatus); + Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus); Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out, desiredStatusOid); char *desiredStatusName = DatumGetCString(desiredStatusNameDatum); @@ -228,8 +243,6 @@ citus_job_wait(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldContext); MemoryContextDelete(waitContext); - - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8b37674d0..a00e7de3b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -293,6 +293,63 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } +/* + * HasNodesWithMissingReferenceTables checks if all reference tables are already copied to + * all nodes. When a node doesn't have a copy of the reference tables we call them missing + * and this function will return true. + * + * The caller might be interested in the list of all reference tables after this check and + * this the list of tables is written to *referenceTableList if a non-null pointer is + * passed. + */ +bool +HasNodesWithMissingReferenceTables(List **referenceTableList) +{ + int colocationId = GetReferenceTableColocationId(); + + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return false; + } + LockColocationId(colocationId, AccessShareLock); + + List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableList) + { + *referenceTableList = referenceTableIdList; + } + + if (list_length(referenceTableIdList) <= 0) + { + return false; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + const char *referenceTableName = get_rel_name(referenceTableId); + + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, + AccessShareLock); + + if (list_length(newWorkersList) <= 0) + { + return false; + } + + return true; +} + + /* * AnyRelationsModifiedInTransaction returns true if any of the given relations * were modified in the current transaction. diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index e38e57569..d814a2165 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -15,9 +15,15 @@ #include "postmaster/bgworker.h" +#include "distributed/metadata_utility.h" + extern BackgroundWorkerHandle * StartCitusBackgroundTaskQueueMonitor(Oid database, Oid extensionOwner); extern void CitusBackgroundTaskQueueMonitorMain(Datum arg); extern void CitusBackgroundTaskExecuter(Datum main_arg); +extern Datum citus_job_cancel(PG_FUNCTION_ARGS); +extern Datum citus_job_wait(PG_FUNCTION_ARGS); +extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); + #endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 748c9ff81..5376dd858 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -384,6 +384,7 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasRunnableBackgroundTask(void); +extern bool HasNonTerminalJobOfType(const char *jobType, int64 *jobIdOut); extern int64 CreateBackgroundJob(const char *jobType, const char *description); extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount, diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 80b282126..ce2de9d9d 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -20,6 +20,7 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 531029bcd..353707c21 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -283,3 +283,8 @@ s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg # will be replaced with # WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx" s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g + +# remove jobId's from the messages of the background rebalancer +s/^ERROR: A rebalance is already running as job [0-9]+$/ERROR: A rebalance is already running as job xxx/g +s/^NOTICE: Scheduled ([0-9]+) moves as job [0-9]+$/NOTICE: Scheduled \1 moves as job xxx/g +s/^HINT: (.*) job_id = [0-9]+ (.*)$/HINT: \1 job_id = xxx \2/g diff --git a/src/test/regress/expected/background_rebalance.out b/src/test/regress/expected/background_rebalance.out new file mode 100644 index 000000000..32a5e86b0 --- /dev/null +++ b/src/test/regress/expected/background_rebalance.out @@ -0,0 +1,180 @@ +CREATE SCHEMA background_rebalance; +SET search_path TO background_rebalance; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- verify the rebalance works - no-op - when the shards are balanced. Noop is shown by wait complaining there is nothing +-- to wait on. +SELECT 1 FROM citus_rebalance_start(); +NOTICE: No moves available for rebalancing + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_wait(); +WARNING: no ongoing rebalance that can be waited on + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- rebalance a table in the background +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE t2 (a int); +SELECT create_distributed_table('t2', 'a' , colocate_with => 't1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- show that we get an error when a table in the colocation group can't be moved non-blocking +SELECT 1 FROM citus_rebalance_start(); +ERROR: cannot use logical replication to transfer shards of the relation t2 since it doesn't have a REPLICA IDENTITY or PRIMARY KEY +DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. +HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode => 'block_writes'); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE t2; +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- show we can stop a rebalance, the stop causes the move to not have happened, eg, our move back below fails. +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_stop(); + citus_rebalance_stop +--------------------------------------------------------------------- + +(1 row) + +-- waiting on this rebalance is racy, as it sometimes sees no rebalance is ongoing while other times it actually sees it ongoing +-- we simply sleep a bit here +SELECT pg_sleep(1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- failing move due to a stopped rebalance, first clean orphans to make the error stable +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_shards(); +RESET client_min_messages; +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); +WARNING: shard is already present on node localhost:xxxxx +DETAIL: Move may have already completed. + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- show we can't start the rebalancer twice +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_rebalance_start(); +ERROR: A rebalance is already running as job xxx +DETAIL: A rebalance was already scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- show that the old rebalancer cannot be started with a background rebalance in progress +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_rebalance_start(); +NOTICE: Scheduled 1 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT rebalance_table_shards(); +ERROR: A rebalance is already running as job xxx +DETAIL: A rebalance was already scheduled as background job +HINT: To monitor progress, run: SELECT * FROM pg_dist_background_task WHERE job_id = xxx ORDER BY task_id ASC; or SELECT * FROM get_rebalance_progress(); +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO WARNING; +DROP SCHEMA background_rebalance CASCADE; diff --git a/src/test/regress/expected/isolation_shard_rebalancer.out b/src/test/regress/expected/isolation_shard_rebalancer.out index 1d6779b7c..2385f239a 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer.out +++ b/src/test/regress/expected/isolation_shard_rebalancer.out @@ -575,3 +575,31 @@ master_set_node_property (1 row) + +starting permutation: s1-rebalance-all s2-citus-rebalance-start s1-commit +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-rebalance-all: + BEGIN; + select rebalance_table_shards(); + +rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +step s2-citus-rebalance-start: + SELECT 1 FROM citus_rebalance_start(); + +ERROR: could not acquire the lock required to rebalance public.colocated1 +step s1-commit: + COMMIT; + +master_set_node_property +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index f76712647..5f37f7a32 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1143,6 +1143,9 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_job_cancel(bigint) void | function citus_job_wait(bigint,citus_job_status) void | function citus_locks() SETOF record + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint + | function citus_rebalance_stop() void + | function citus_rebalance_wait() void | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void | function create_distributed_table_concurrently(regclass,text,citus.distribution_type,text,integer) void | function isolate_tenant_to_new_shard(regclass,"any",text,citus.shard_transfer_mode) bigint @@ -1165,7 +1168,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | type split_copy_info | type split_shard_info | view citus_locks -(52 rows) +(55 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index bb04fcfb9..085a47769 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -90,6 +90,9 @@ ORDER BY 1; function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) + function citus_rebalance_stop() + function citus_rebalance_wait() function citus_relation_size(regclass) function citus_remote_connection_stats() function citus_remove_node(text,integer) @@ -283,5 +286,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(275 rows) +(278 rows) diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 3eeb3e8db..96eafc3ea 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -3,6 +3,7 @@ test: multi_cluster_management test: multi_test_catalog_views test: shard_rebalancer_unit test: shard_rebalancer +test: background_rebalance test: worker_copy_table_to_node test: foreign_key_to_reference_shard_rebalance test: multi_move_mx diff --git a/src/test/regress/spec/isolation_shard_rebalancer.spec b/src/test/regress/spec/isolation_shard_rebalancer.spec index 1aca39ca6..d9d8d99ed 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer.spec @@ -44,6 +44,12 @@ step "s1-replicate-nc" select replicate_table_shards('non_colocated'); } +step "s1-rebalance-all" +{ + BEGIN; + select rebalance_table_shards(); +} + step "s1-commit" { COMMIT; @@ -82,6 +88,11 @@ step "s2-drain" select master_drain_node('localhost', 57638); } +step "s2-citus-rebalance-start" +{ + SELECT 1 FROM citus_rebalance_start(); +} + // disallowed because it's the same table permutation "s1-rebalance-nc" "s2-rebalance-nc" "s1-commit" @@ -112,3 +123,6 @@ permutation "s1-rebalance-c1" "s2-drain" "s1-commit" permutation "s1-replicate-c1" "s2-drain" "s1-commit" permutation "s1-rebalance-nc" "s2-drain" "s1-commit" permutation "s1-replicate-nc" "s2-drain" "s1-commit" + +// disallow the background rebalancer to run when rebalance_table_shard rung +permutation "s1-rebalance-all" "s2-citus-rebalance-start" "s1-commit" diff --git a/src/test/regress/sql/background_rebalance.sql b/src/test/regress/sql/background_rebalance.sql new file mode 100644 index 000000000..9158fc532 --- /dev/null +++ b/src/test/regress/sql/background_rebalance.sql @@ -0,0 +1,64 @@ +CREATE SCHEMA background_rebalance; +SET search_path TO background_rebalance; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +SELECT pg_reload_conf(); + +CREATE TABLE t1 (a int PRIMARY KEY); +SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none'); + +-- verify the rebalance works - no-op - when the shards are balanced. Noop is shown by wait complaining there is nothing +-- to wait on. +SELECT 1 FROM citus_rebalance_start(); +SELECT citus_rebalance_wait(); + +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + +-- rebalance a table in the background +SELECT 1 FROM citus_rebalance_start(); +SELECT citus_rebalance_wait(); + +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + +CREATE TABLE t2 (a int); +SELECT create_distributed_table('t2', 'a' , colocate_with => 't1'); + +-- show that we get an error when a table in the colocation group can't be moved non-blocking +SELECT 1 FROM citus_rebalance_start(); +SELECT 1 FROM citus_rebalance_start(shard_transfer_mode => 'block_writes'); +SELECT citus_rebalance_wait(); + +DROP TABLE t2; + +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + +-- show we can stop a rebalance, the stop causes the move to not have happened, eg, our move back below fails. +SELECT 1 FROM citus_rebalance_start(); +SELECT citus_rebalance_stop(); +-- waiting on this rebalance is racy, as it sometimes sees no rebalance is ongoing while other times it actually sees it ongoing +-- we simply sleep a bit here +SELECT pg_sleep(1); + +-- failing move due to a stopped rebalance, first clean orphans to make the error stable +SET client_min_messages TO WARNING; +CALL citus_cleanup_orphaned_shards(); +RESET client_min_messages; +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); + + +-- show we can't start the rebalancer twice +SELECT 1 FROM citus_rebalance_start(); +SELECT 1 FROM citus_rebalance_start(); +SELECT citus_rebalance_wait(); + +-- show that the old rebalancer cannot be started with a background rebalance in progress +SELECT citus_move_shard_placement(85674000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode => 'block_writes'); +SELECT 1 FROM citus_rebalance_start(); +SELECT rebalance_table_shards(); +SELECT citus_rebalance_wait(); + + +SET client_min_messages TO WARNING; +DROP SCHEMA background_rebalance CASCADE;