mirror of https://github.com/citusdata/citus.git
refactor inner loop of background worker thing
parent
5a2ec73475
commit
80e2edf1a5
|
@ -912,57 +912,60 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
|
||||||
RebalanceJob *job = GetRunableRebalanceJob();
|
RebalanceJob *job = GetRunableRebalanceJob();
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
if (job)
|
if (!job)
|
||||||
{
|
{
|
||||||
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
|
|
||||||
MemoryContext savedContext = CurrentMemoryContext;
|
|
||||||
|
|
||||||
UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING);
|
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
hasJobs = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
|
||||||
|
|
||||||
|
/* Update job status to indicate it is running */
|
||||||
|
UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING);
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
MemoryContext savedContext = CurrentMemoryContext;
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
if (ExecuteRebalanceJob(job))
|
||||||
|
{
|
||||||
|
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
MemoryContextSwitchTo(savedContext);
|
||||||
|
|
||||||
|
ErrorData *edata = CopyErrorData();
|
||||||
|
FlushErrorState();
|
||||||
|
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
BeginInternalSubTransaction(NULL);
|
UpdateJobError(job, edata);
|
||||||
|
|
||||||
PG_TRY();
|
PopActiveSnapshot();
|
||||||
{
|
CommitTransactionCommand();
|
||||||
if (ExecuteRebalanceJob(job))
|
|
||||||
{
|
|
||||||
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
|
|
||||||
}
|
|
||||||
|
|
||||||
ReleaseCurrentSubTransaction();
|
FreeErrorData(edata);
|
||||||
}
|
edata = NULL;
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
MemoryContextSwitchTo(savedContext);
|
|
||||||
|
|
||||||
ErrorData *edata = CopyErrorData();
|
/* TODO log that there was an error */
|
||||||
FlushErrorState();
|
|
||||||
|
|
||||||
RollbackAndReleaseCurrentSubTransaction();
|
|
||||||
|
|
||||||
UpdateJobError(job, edata);
|
|
||||||
|
|
||||||
FreeErrorData(edata);
|
|
||||||
edata = NULL;
|
|
||||||
|
|
||||||
/* TODO log that there was an error */
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
hasJobs = false;
|
|
||||||
}
|
}
|
||||||
|
PG_END_TRY();
|
||||||
}
|
}
|
||||||
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
ProcessCompletedNotifies();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContextPerJob);
|
MemoryContextSwitchTo(oldContextPerJob);
|
||||||
MemoryContextDelete(perJobContext);
|
MemoryContextDelete(perJobContext);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue