add error state + wait for job udf

background-job-details
Nils Dijk 2022-07-14 15:08:20 +02:00 committed by Jelte Fennema
parent 12b0063c31
commit 12aec202d8
6 changed files with 137 additions and 1 deletions

View File

@ -2264,11 +2264,34 @@ RebalanceJobStatusByOid(Oid enumOid)
{
return REBALANCE_JOB_STATUS_SCHEDULED;
}
else if (enumOid == JobStatusErrorId())
{
return REBALANCE_JOB_STATUS_ERROR;
}
ereport(ERROR, (errmsg("unknown enum value for citus_job_status")));
return REBALANCE_JOB_STATUS_UNKNOWN;
}
bool
IsRebalanceJobStatusTerminal(RebalanceJobStatus status)
{
switch (status)
{
case REBALANCE_JOB_STATUS_DONE:
case REBALANCE_JOB_STATUS_ERROR:
{
return true;
}
default:
{
return false;
}
}
}
static Oid
RebalanceJobStatusOid(RebalanceJobStatus status)
{
@ -2338,6 +2361,52 @@ GetScheduledRebalanceJob(void)
}
RebalanceJob *
GetScheduledRebalanceJobyJobID(int64 jobId)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.jobid == $jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
RebalanceJob *job = NULL;
if (HeapTupleIsValid(jobTuple))
{
Form_pg_dist_rebalance_job jobData = NULL;
jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple);
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobData->jobid;
job->status = RebalanceJobStatusByOid(jobData->status);
/* TODO parse the actual job */
Datum datumArray[Natts_pg_dist_rebalance_jobs];
bool isNullArray[Natts_pg_dist_rebalance_jobs];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1]));
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
return job;
}
void
UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
{

View File

@ -91,3 +91,5 @@ CREATE TABLE citus.pg_dist_rebalance_jobs(
ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid);
CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid);
#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql"

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$;
COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job()
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$;
COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job()
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job() TO PUBLIC;

View File

@ -0,0 +1,46 @@
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "utils/wait_event.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
/* pg_catalog.citus_wait_for_rebalance_job(jobid bigint) */
PG_FUNCTION_INFO_V1(citus_wait_for_rebalance_job);
Datum
citus_wait_for_rebalance_job(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 jobid = PG_GETARG_INT64(0);
for (;;)
{
CHECK_FOR_INTERRUPTS();
RebalanceJob *job = GetScheduledRebalanceJobyJobID(jobid);
if (!job)
{
ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid)));
}
if (IsRebalanceJobStatusTerminal(job->status))
{
PG_RETURN_VOID();
}
/* not finished, sleeping */
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1000, /* TODO, what timeout would be good here */
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch);
}
}

View File

@ -208,7 +208,8 @@ typedef enum RebalanceJobStatus
{
REBALANCE_JOB_STATUS_UNKNOWN,
REBALANCE_JOB_STATUS_SCHEDULED,
REBALANCE_JOB_STATUS_DONE
REBALANCE_JOB_STATUS_DONE,
REBALANCE_JOB_STATUS_ERROR
} RebalanceJobStatus;
@ -329,6 +330,8 @@ extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
extern void UpdateJobError(RebalanceJob *job, ErrorData *edata);
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status);
#endif /* METADATA_UTILITY_H */