refactor updating of task tuple to inmemory representation of said tuple

background-job-details
Nils Dijk 2022-08-09 18:11:33 +02:00 committed by Jelte Fennema
parent c09d0feb85
commit 1b66403cb9
3 changed files with 65 additions and 29 deletions

View File

@ -2546,7 +2546,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]); task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]);
if (!nulls[Anum_pg_dist_background_tasks_pid - 1]) if (!nulls[Anum_pg_dist_background_tasks_pid - 1])
{ {
task->pid = palloc0(sizeof(task->pid)); task->pid = palloc0(sizeof(int32));
*(task->pid) = DatumGetInt32(values[Anum_pg_dist_background_tasks_pid - 1]); *(task->pid) = DatumGetInt32(values[Anum_pg_dist_background_tasks_pid - 1]);
} }
task->status = BackgroundTaskStatusByOid( task->status = BackgroundTaskStatusByOid(
@ -2557,7 +2557,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
if (!nulls[Anum_pg_dist_background_tasks_retry_count - 1]) if (!nulls[Anum_pg_dist_background_tasks_retry_count - 1])
{ {
task->retry_count = palloc0(sizeof(task->retry_count)); task->retry_count = palloc0(sizeof(int32));
*(task->retry_count) = DatumGetInt32( *(task->retry_count) = DatumGetInt32(
values[Anum_pg_dist_background_tasks_retry_count - 1]); values[Anum_pg_dist_background_tasks_retry_count - 1]);
} }
@ -2718,8 +2718,7 @@ GetBackgroundTaskByTaskId(int64 taskId)
void void
UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, UpdateBackgroundTask(BackgroundTask *task)
const int32 *retry_count, char *message)
{ {
Relation pgDistBackgroundTasks = Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
@ -2729,9 +2728,9 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
ScanKeyData scanKey[1] = { 0 }; ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true; const bool indexOK = true;
/* WHERE task_id = $taskId */ /* WHERE task_id = $task->taskid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid));
SysScanDesc scanDescriptor = SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks, systable_beginscan(pgDistBackgroundTasks,
@ -2742,7 +2741,7 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
if (!HeapTupleIsValid(heapTuple)) if (!HeapTupleIsValid(heapTuple))
{ {
ereport(ERROR, (errmsg("could not find background task entry for task_id: " ereport(ERROR, (errmsg("could not find background task entry for task_id: "
UINT64_FORMAT, taskId))); UINT64_FORMAT, task->taskid)));
} }
Datum values[Natts_pg_dist_background_tasks] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
@ -2751,37 +2750,40 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
bool updated = false;
#define UPDATE_FIELD(field, newNull, newValue) \ #define UPDATE_FIELD(field, newNull, newValue) \
replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \ replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \
|| (values[(field - 1)] != (newValue))); \ || (values[(field - 1)] != (newValue))); \
isnull[(field - 1)] = (newNull); \ isnull[(field - 1)] = (newNull); \
values[(field - 1)] = (newValue); values[(field - 1)] = (newValue); \
updated |= replace[(field - 1)]
if (pid) if (task->pid)
{ {
UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*pid)); UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*task->pid));
} }
else else
{ {
UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid);
} }
Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(status)); Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(task->status));
UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid);
if (retry_count) if (task->retry_count)
{ {
UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false, Int32GetDatum( UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false,
*retry_count)); Int32GetDatum(*task->retry_count));
} }
else else
{ {
UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, true, InvalidOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, true, InvalidOid);
} }
if (message) if (task->message)
{ {
Oid messageOid = CStringGetTextDatum(message); Oid messageOid = CStringGetTextDatum(task->message);
UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid); UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid);
} }
else else
@ -2791,11 +2793,15 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
#undef UPDATE_FIELD #undef UPDATE_FIELD
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); if (updated)
{
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
CommandCounterIncrement(); CommandCounterIncrement();
}
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistBackgroundTasks, NoLock); table_close(pgDistBackgroundTasks, NoLock);

View File

@ -44,6 +44,30 @@ static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database,
char *command); char *command);
static void ExecuteSqlString(const char *sql); static void ExecuteSqlString(const char *sql);
static void
BackgroundTaskUpdatePid(BackgroundTask *task, pid_t *pid)
{
if (pid)
{
if (!task->pid)
{
MemoryContext taskContext = GetMemoryChunkContext(task);
task->pid = MemoryContextAlloc(taskContext, sizeof(int32));
}
*task->pid = *pid;
}
else
{
/* clear any existing pid */
if (task->pid)
{
pfree(task->pid);
}
task->pid = NULL;
}
}
BackgroundWorkerHandle * BackgroundWorkerHandle *
StartCitusBackgroundTaskMonitorWorker(Oid database, Oid extensionOwner) StartCitusBackgroundTaskMonitorWorker(Oid database, Oid extensionOwner)
{ {
@ -144,14 +168,14 @@ CitusBackgroundTaskMonitorMain(Datum arg)
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* /*
* We need to load the job into the perTaskContext as we will switch contexts * We need to load the task into the perTaskContext as we will switch contexts
* later due to the committing and starting of new transactions * later due to the committing and starting of new transactions
*/ */
MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext); MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext);
BackgroundTask *job = GetRunnableBackgroundTask(); BackgroundTask *task = GetRunnableBackgroundTask();
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (!job) if (!task)
{ {
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
@ -167,7 +191,7 @@ CitusBackgroundTaskMonitorMain(Datum arg)
/* TODO find the actual database and username */ /* TODO find the actual database and username */
BackgroundWorkerHandle *handle = BackgroundWorkerHandle *handle =
StartCitusBackgroundJobExecuter("postgres", "nilsdijk", job->command); StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command);
if (handle == NULL) if (handle == NULL)
{ {
@ -178,20 +202,23 @@ CitusBackgroundTaskMonitorMain(Datum arg)
pid_t pid = 0; pid_t pid = 0;
GetBackgroundWorkerPid(handle, &pid); GetBackgroundWorkerPid(handle, &pid);
ereport(LOG, (errmsg("found job with jobid: %ld", job->taskid))); ereport(LOG, (errmsg("found task with jobid: %ld", task->taskid)));
StartTransactionCommand(); StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* Update job status to indicate it is running */ task->status = BACKGROUND_TASK_STATUS_RUNNING;
UpdateJobStatus(job->taskid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL); BackgroundTaskUpdatePid(task, &pid);
/* Update task status to indicate it is running */
UpdateBackgroundTask(task);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
MemoryContextSwitchTo(perTaskContext); MemoryContextSwitchTo(perTaskContext);
/* TODO keep polling the job */ /* TODO keep polling the task */
while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED)
{ {
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
@ -213,8 +240,10 @@ CitusBackgroundTaskMonitorMain(Datum arg)
StartTransactionCommand(); StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
/* TODO job can actually also have failed*/ /* TODO task can actually also have failed*/
UpdateJobStatus(job->taskid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL); task->status = BACKGROUND_TASK_STATUS_DONE;
BackgroundTaskUpdatePid(task, NULL);
UpdateBackgroundTask(task);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();

View File

@ -341,6 +341,7 @@ extern bool BackgroundTaskHasUmnetDependencies(int64 taskId);
extern BackgroundTask * GetRunnableBackgroundTask(void); extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void); extern void ResetRunningBackgroundTasks(void);
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId);
extern void UpdateBackgroundTask(BackgroundTask *task);
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
const int32 *retry_count, char *message); const int32 *retry_count, char *message);
extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata); extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata);