From 836950ff07ae4da8098f59efd5c86c7807c484f1 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 18 Jul 2022 17:11:48 +0200 Subject: [PATCH] Add running state to rebalance job with pid reported --- .../distributed/metadata/metadata_cache.c | 14 +++ .../distributed/metadata/metadata_utility.c | 119 ++++++++++++------ .../distributed/sql/citus--11.0-4--11.1-1.sql | 3 +- src/backend/distributed/utils/maintenanced.c | 26 ++++ src/backend/distributed/utils/rebalance_job.c | 2 +- src/include/distributed/metadata_cache.h | 1 + src/include/distributed/metadata_utility.h | 4 +- .../distributed/pg_dist_rebalance_jobs.h | 12 +- 8 files changed, 137 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 69a1d40db..491046918 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -145,6 +145,7 @@ typedef struct MetadataCacheData Oid distRebalanceJobsJobsIndexId; Oid distRebalanceJobsStatusJobsIndexId; Oid jobStatusScheduledId; + Oid jobStatusRunningId; Oid jobStatusDoneId; Oid jobStatusErrorId; Oid distRebalanceStrategyRelationId; @@ -3135,6 +3136,19 @@ JobStatusScheduledId(void) } +Oid +JobStatusRunningId(void) +{ + if (!MetadataCache.jobStatusRunningId) + { + MetadataCache.jobStatusRunningId = + LookupStringEnumValueId("citus_job_status", "running"); + } + + return MetadataCache.jobStatusRunningId; +} + + Oid JobStatusDoneId(void) { diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 19f89a8a4..5d39f43cb 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2230,23 +2230,35 @@ HasScheduledRebalanceJobs() Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), AccessShareLock); - /* pg_dist_rebalance_jobs.status == 'scheduled' */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId())); - - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - HeapTuple jobTuple = systable_getnext(scanDescriptor); + /* find any job in states listed here */ + RebalanceJobStatus jobs[] = { + REBALANCE_JOB_STATUS_RUNNING, + REBALANCE_JOB_STATUS_SCHEDULED + }; bool hasScheduledJob = false; - if (HeapTupleIsValid(jobTuple)) + for (int i = 0; !hasScheduledJob && i < sizeof(jobs) / sizeof(jobs[0]); i++) { - hasScheduledJob = true; + /* pg_dist_rebalance_jobs.status == jobs[i] */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RebalanceJobStatusOid(jobs[i]))); + + SysScanDesc scanDescriptor = systable_beginscan( + pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); + + HeapTuple jobTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(jobTuple)) + { + hasScheduledJob = true; + } + + systable_endscan(scanDescriptor); } - systable_endscan(scanDescriptor); table_close(pgDistRebalanceJobs, AccessShareLock); return hasScheduledJob; @@ -2264,6 +2276,10 @@ RebalanceJobStatusByOid(Oid enumOid) { return REBALANCE_JOB_STATUS_SCHEDULED; } + else if (enumOid == JobStatusRunningId()) + { + return REBALANCE_JOB_STATUS_RUNNING; + } else if (enumOid == JobStatusErrorId()) { return REBALANCE_JOB_STATUS_ERROR; @@ -2292,7 +2308,7 @@ IsRebalanceJobStatusTerminal(RebalanceJobStatus status) } -static Oid +Oid RebalanceJobStatusOid(RebalanceJobStatus status) { switch (status) @@ -2302,6 +2318,11 @@ RebalanceJobStatusOid(RebalanceJobStatus status) return JobStatusScheduledId(); } + case REBALANCE_JOB_STATUS_RUNNING: + { + return JobStatusRunningId(); + } + case REBALANCE_JOB_STATUS_DONE: { return JobStatusDoneId(); @@ -2385,36 +2406,47 @@ GetScheduledRebalanceJob(void) Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), AccessShareLock); - /* pg_dist_rebalance_jobs.status == 'scheduled' */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId())); + RebalanceJobStatus jobStatus[] = { + REBALANCE_JOB_STATUS_RUNNING, + REBALANCE_JOB_STATUS_SCHEDULED + }; - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsStatusJobsIdIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - HeapTuple jobTuple = systable_getnext(scanDescriptor); RebalanceJob *job = NULL; - if (HeapTupleIsValid(jobTuple)) + for (int i = 0; !job && i < sizeof(jobStatus) / sizeof(jobStatus[0]); i++) { - Form_pg_dist_rebalance_job jobData = NULL; - jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple); + /* pg_dist_rebalance_jobs.status == jobStatus[i] */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum( + RebalanceJobStatusOid(jobStatus[i]))); - job = palloc0(sizeof(RebalanceJob)); - job->jobid = jobData->jobid; - job->status = RebalanceJobStatusByOid(jobData->status); + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, + scanKey); - /* TODO parse the actual job */ - Datum datumArray[Natts_pg_dist_rebalance_jobs]; - bool isNullArray[Natts_pg_dist_rebalance_jobs]; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); - heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); + HeapTuple jobTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(jobTuple)) + { + Form_pg_dist_rebalance_job jobData = NULL; + jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple); - job->command = text_to_cstring( - DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + job = palloc0(sizeof(RebalanceJob)); + job->jobid = jobData->jobid; + job->status = RebalanceJobStatusByOid(jobData->status); + + /* TODO parse the actual job */ + Datum datumArray[Natts_pg_dist_rebalance_jobs]; + bool isNullArray[Natts_pg_dist_rebalance_jobs]; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); + + job->command = text_to_cstring( + DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + } + + systable_endscan(scanDescriptor); } - systable_endscan(scanDescriptor); table_close(pgDistRebalanceJobs, AccessShareLock); return job; @@ -2422,7 +2454,7 @@ GetScheduledRebalanceJob(void) RebalanceJob * -GetScheduledRebalanceJobyJobID(int64 jobId) +GetScheduledRebalanceJobByJobID(int64 jobId) { const int scanKeyCount = 1; ScanKeyData scanKey[1]; @@ -2503,6 +2535,21 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + /* TODO figure out a nice way on how to update a tuple selectively */ + if (newStatus == REBALANCE_JOB_STATUS_RUNNING) + { + /* update pid for running status */ + values[Anum_pg_dist_rebalance_jobs_pid - 1] = Int32GetDatum((int32) MyProcPid); + isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + } + else + { + values[Anum_pg_dist_rebalance_jobs_pid - 1] = 0; + isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; + } + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index b4f473a79..933b3523a 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,11 +77,12 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" -CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done', 'error'); +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'done', 'error'); ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; CREATE TABLE citus.pg_dist_rebalance_jobs( jobid bigserial NOT NULL, + pid integer, status pg_catalog.citus_job_status default 'scheduled' NOT NULL, command text NOT NULL, retry_count integer, diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index bc0c71295..4df29d9be 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -858,9 +858,18 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) /* pg_usleep(30 * 1000 * 1000); */ + MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, + "PerJobContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); bool hasJobs = true; while (hasJobs) { + MemoryContextReset(perJobContext); + CHECK_FOR_INTERRUPTS(); InvalidateMetadataSystemCache(); @@ -875,11 +884,26 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { + /* + * We need to load the job into the perJobContext as we will switch contexts + * later due to the committing and starting of new transactions + */ + MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); RebalanceJob *job = GetScheduledRebalanceJob(); + MemoryContextSwitchTo(oldContext); + if (job) { ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); MemoryContext savedContext = CurrentMemoryContext; + + UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING); + PopActiveSnapshot(); + CommitTransactionCommand(); + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + BeginInternalSubTransaction(NULL); PG_TRY(); @@ -919,6 +943,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) CommitTransactionCommand(); ProcessCompletedNotifies(); } + MemoryContextSwitchTo(oldContextPerJob); + MemoryContextDelete(perJobContext); } diff --git a/src/backend/distributed/utils/rebalance_job.c b/src/backend/distributed/utils/rebalance_job.c index 08356bd91..e257f4852 100644 --- a/src/backend/distributed/utils/rebalance_job.c +++ b/src/backend/distributed/utils/rebalance_job.c @@ -25,7 +25,7 @@ citus_wait_for_rebalance_job(PG_FUNCTION_ARGS) { CHECK_FOR_INTERRUPTS(); - RebalanceJob *job = GetScheduledRebalanceJobyJobID(jobid); + RebalanceJob *job = GetScheduledRebalanceJobByJobID(jobid); if (!job) { ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid))); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 7a3957dcf..26b63bee2 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -272,6 +272,7 @@ extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); extern Oid JobStatusScheduledId(void); +extern Oid JobStatusRunningId(void); extern Oid JobStatusDoneId(void); extern Oid JobStatusErrorId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 5c11bbabe..a3cbed1cb 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -208,6 +208,7 @@ typedef enum RebalanceJobStatus { REBALANCE_JOB_STATUS_UNKNOWN, REBALANCE_JOB_STATUS_SCHEDULED, + REBALANCE_JOB_STATUS_RUNNING, REBALANCE_JOB_STATUS_DONE, REBALANCE_JOB_STATUS_ERROR } RebalanceJobStatus; @@ -332,8 +333,9 @@ extern bool HasScheduledRebalanceJobs(void); extern int64 GetNextRebalanceJobId(void); extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command); extern RebalanceJob * GetScheduledRebalanceJob(void); -extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId); +extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); +extern Oid RebalanceJobStatusOid(RebalanceJobStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs.h b/src/include/distributed/pg_dist_rebalance_jobs.h index a968d1b89..a5b94745c 100644 --- a/src/include/distributed/pg_dist_rebalance_jobs.h +++ b/src/include/distributed/pg_dist_rebalance_jobs.h @@ -9,6 +9,7 @@ typedef struct FormData_pg_dist_rebalance_job { int64 jobid; + int32 pid; Oid status; #ifdef CATALOG_VARLEN /* variable-length fields start here */ text command; @@ -28,12 +29,13 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job; * compiler constants for pg_dist_rebalance_jobs * ---------------- */ -#define Natts_pg_dist_rebalance_jobs 5 +#define Natts_pg_dist_rebalance_jobs 6 #define Anum_pg_dist_rebalance_jobs_jobid 1 -#define Anum_pg_dist_rebalance_jobs_status 2 -#define Anum_pg_dist_rebalance_jobs_command 3 -#define Anum_pg_dist_rebalance_jobs_retry_count 4 -#define Anum_pg_dist_rebalance_jobs_message 5 +#define Anum_pg_dist_rebalance_jobs_pid 2 +#define Anum_pg_dist_rebalance_jobs_status 3 +#define Anum_pg_dist_rebalance_jobs_command 4 +#define Anum_pg_dist_rebalance_jobs_retry_count 5 +#define Anum_pg_dist_rebalance_jobs_message 6 #define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq"