From 6fa08619cf3d43d8c060d7129b95f9a6614fceff Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 20 Jul 2022 17:07:11 +0200 Subject: [PATCH] reset job status' on restart of background worker --- .../distributed/metadata/metadata_utility.c | 52 ++++++++++++++++++- src/backend/distributed/shared_library_init.c | 11 ++++ src/backend/distributed/utils/maintenanced.c | 22 +++++++- src/include/distributed/metadata_utility.h | 1 + src/include/distributed/shard_cleaner.h | 1 + 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index deb90deff..711567542 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2396,6 +2396,57 @@ ScheduleBackgrounRebalanceJob(char *command) } +void +ResetRunningJobs(void) +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + const bool indexOK = true; + + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + AccessShareLock); + + /* pg_dist_rebalance_jobs.status == 'running' */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusRunningId())); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple jobTuple = NULL; + while (HeapTupleIsValid(jobTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + heap_deform_tuple(jobTuple, tupleDescriptor, values, isnull); + + values[Anum_pg_dist_rebalance_jobs_status - 1] = + ObjectIdGetDatum(JobStatusScheduledId()); + isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + + values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid; + isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + + jobTuple = heap_modify_tuple(jobTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistRebalanceJobs, &jobTuple->t_self, jobTuple); + } + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + + table_close(pgDistRebalanceJobs, AccessShareLock); +} + + RebalanceJob * GetScheduledRebalanceJob(void) { @@ -2407,7 +2458,6 @@ GetScheduledRebalanceJob(void) AccessShareLock); RebalanceJobStatus jobStatus[] = { - REBALANCE_JOB_STATUS_RUNNING, REBALANCE_JOB_STATUS_SCHEDULED }; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 632f693eb..4bd64d51a 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1892,6 +1892,17 @@ RegisterCitusConfigVariables(void) GUC_UNIT_MS, NULL, NULL, NULL); + /* TODO remove before merge */ + DefineCustomBoolVariable( + "citus.rebalance_job_debug_delay", + NULL, + NULL, + &RebalanceJobDebugDelay, + false, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 4df29d9be..e0f797d03 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -97,6 +97,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int RebalanceCheckInterval = 1000; +bool RebalanceJobDebugDelay = false; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -856,7 +857,10 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) ereport(LOG, (errmsg("background jobs runner"))); -/* pg_usleep(30 * 1000 * 1000); */ + if (RebalanceJobDebugDelay) + { + pg_usleep(30 * 1000 * 1000); + } MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, "PerJobContext", @@ -864,6 +868,22 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + /* + * First we find all jobs that are running, we need to check if they are still running + * if not reset their state back to scheduled. + */ + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* TODO have an actual function to check if the worker is still running */ + ResetRunningJobs(); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); bool hasJobs = true; while (hasJobs) diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index a3cbed1cb..6bd0dadf3 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -333,6 +333,7 @@ extern bool HasScheduledRebalanceJobs(void); extern int64 GetNextRebalanceJobId(void); extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command); extern RebalanceJob * GetScheduledRebalanceJob(void); +extern void ResetRunningJobs(void); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 14ced520d..00311e951 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -14,6 +14,7 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern int RebalanceCheckInterval; +extern bool RebalanceJobDebugDelay; extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove;