From 12aec202d8e437e97038697b58a7e6de45b873a0 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Thu, 14 Jul 2022 15:08:20 +0200 Subject: [PATCH] add error state + wait for job udf --- .../distributed/metadata/metadata_utility.c | 69 +++++++++++++++++++ .../distributed/sql/citus--11.0-4--11.1-1.sql | 2 + .../citus_wait_for_rebalance_job/11.1-1.sql | 8 +++ .../citus_wait_for_rebalance_job/latest.sql | 8 +++ src/backend/distributed/utils/rebalance_job.c | 46 +++++++++++++ src/include/distributed/metadata_utility.h | 5 +- 6 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql create mode 100644 src/backend/distributed/utils/rebalance_job.c diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 8adb70a1d..12144b152 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2264,11 +2264,34 @@ RebalanceJobStatusByOid(Oid enumOid) { return REBALANCE_JOB_STATUS_SCHEDULED; } + else if (enumOid == JobStatusErrorId()) + { + return REBALANCE_JOB_STATUS_ERROR; + } ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); return REBALANCE_JOB_STATUS_UNKNOWN; } +bool +IsRebalanceJobStatusTerminal(RebalanceJobStatus status) +{ + switch (status) + { + case REBALANCE_JOB_STATUS_DONE: + case REBALANCE_JOB_STATUS_ERROR: + { + return true; + } + + default: + { + return false; + } + } +} + + static Oid RebalanceJobStatusOid(RebalanceJobStatus status) { @@ -2338,6 +2361,52 @@ GetScheduledRebalanceJob(void) } +RebalanceJob * +GetScheduledRebalanceJobyJobID(int64 jobId) +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + AccessShareLock); + + /* pg_dist_rebalance_jobs.jobid == $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + HeapTuple jobTuple = systable_getnext(scanDescriptor); + RebalanceJob *job = NULL; + if (HeapTupleIsValid(jobTuple)) + { + Form_pg_dist_rebalance_job jobData = NULL; + jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple); + + job = palloc0(sizeof(RebalanceJob)); + job->jobid = jobData->jobid; + job->status = RebalanceJobStatusByOid(jobData->status); + + /* TODO parse the actual job */ + Datum datumArray[Natts_pg_dist_rebalance_jobs]; + bool isNullArray[Natts_pg_dist_rebalance_jobs]; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); + + job->command = text_to_cstring( + DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); + } + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobs, AccessShareLock); + + return job; +} + + void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) { diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index f8a0cae42..b4f473a79 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -91,3 +91,5 @@ CREATE TABLE citus.pg_dist_rebalance_jobs( ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); + +#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql new file mode 100644 index 000000000..134ae1b82 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$; +COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() + IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql new file mode 100644 index 000000000..134ae1b82 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$; +COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() + IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() TO PUBLIC; diff --git a/src/backend/distributed/utils/rebalance_job.c b/src/backend/distributed/utils/rebalance_job.c new file mode 100644 index 000000000..08356bd91 --- /dev/null +++ b/src/backend/distributed/utils/rebalance_job.c @@ -0,0 +1,46 @@ + + +#include "postgres.h" + +#include "fmgr.h" +#include "miscadmin.h" + +#include "utils/wait_event.h" + +#include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" + +/* pg_catalog.citus_wait_for_rebalance_job(jobid bigint) */ +PG_FUNCTION_INFO_V1(citus_wait_for_rebalance_job); + +Datum +citus_wait_for_rebalance_job(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 jobid = PG_GETARG_INT64(0); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + RebalanceJob *job = GetScheduledRebalanceJobyJobID(jobid); + if (!job) + { + ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid))); + } + + if (IsRebalanceJobStatusTerminal(job->status)) + { + PG_RETURN_VOID(); + } + + /* not finished, sleeping */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000, /* TODO, what timeout would be good here */ + WAIT_EVENT_PG_SLEEP); + ResetLatch(MyLatch); + } +} diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 109234e83..01197318a 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -208,7 +208,8 @@ typedef enum RebalanceJobStatus { REBALANCE_JOB_STATUS_UNKNOWN, REBALANCE_JOB_STATUS_SCHEDULED, - REBALANCE_JOB_STATUS_DONE + REBALANCE_JOB_STATUS_DONE, + REBALANCE_JOB_STATUS_ERROR } RebalanceJobStatus; @@ -329,6 +330,8 @@ extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledRebalanceJobs(void); extern RebalanceJob * GetScheduledRebalanceJob(void); +extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); +extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); #endif /* METADATA_UTILITY_H */