diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 70dea09fc..e32337fe2 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2546,7 +2546,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]); if (!nulls[Anum_pg_dist_background_tasks_pid - 1]) { - task->pid = palloc0(sizeof(task->pid)); + task->pid = palloc0(sizeof(int32)); *(task->pid) = DatumGetInt32(values[Anum_pg_dist_background_tasks_pid - 1]); } task->status = BackgroundTaskStatusByOid( @@ -2557,7 +2557,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) if (!nulls[Anum_pg_dist_background_tasks_retry_count - 1]) { - task->retry_count = palloc0(sizeof(task->retry_count)); + task->retry_count = palloc0(sizeof(int32)); *(task->retry_count) = DatumGetInt32( values[Anum_pg_dist_background_tasks_retry_count - 1]); } @@ -2718,8 +2718,7 @@ GetBackgroundTaskByTaskId(int64 taskId) void -UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, - const int32 *retry_count, char *message) +UpdateBackgroundTask(BackgroundTask *task) { Relation pgDistBackgroundTasks = table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); @@ -2729,9 +2728,9 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, ScanKeyData scanKey[1] = { 0 }; const bool indexOK = true; - /* WHERE task_id = $taskId */ + /* WHERE task_id = $task->taskid */ ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid)); SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, @@ -2742,7 +2741,7 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, if (!HeapTupleIsValid(heapTuple)) { ereport(ERROR, (errmsg("could not find background task entry for task_id: " - UINT64_FORMAT, taskId))); + UINT64_FORMAT, task->taskid))); } Datum values[Natts_pg_dist_background_tasks] = { 0 }; @@ -2751,37 +2750,40 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + bool updated = false; + #define UPDATE_FIELD(field, newNull, newValue) \ replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \ || (values[(field - 1)] != (newValue))); \ isnull[(field - 1)] = (newNull); \ - values[(field - 1)] = (newValue); + values[(field - 1)] = (newValue); \ + updated |= replace[(field - 1)] - if (pid) + if (task->pid) { - UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*pid)); + UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, false, Int32GetDatum(*task->pid)); } else { UPDATE_FIELD(Anum_pg_dist_background_tasks_pid, true, InvalidOid); } - Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(status)); + Oid statusOid = ObjectIdGetDatum(CitusTaskStatusOid(task->status)); UPDATE_FIELD(Anum_pg_dist_background_tasks_status, false, statusOid); - if (retry_count) + if (task->retry_count) { - UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false, Int32GetDatum( - *retry_count)); + UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, false, + Int32GetDatum(*task->retry_count)); } else { UPDATE_FIELD(Anum_pg_dist_background_tasks_retry_count, true, InvalidOid); } - if (message) + if (task->message) { - Oid messageOid = CStringGetTextDatum(message); + Oid messageOid = CStringGetTextDatum(task->message); UPDATE_FIELD(Anum_pg_dist_background_tasks_message, false, messageOid); } else @@ -2791,11 +2793,15 @@ UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, #undef UPDATE_FIELD - heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + if (updated) + { + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); - CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); + CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); - CommandCounterIncrement(); + CommandCounterIncrement(); + } systable_endscan(scanDescriptor); table_close(pgDistBackgroundTasks, NoLock); diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 4f7a76783..49dbfb280 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -44,6 +44,30 @@ static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database, char *command); static void ExecuteSqlString(const char *sql); +static void +BackgroundTaskUpdatePid(BackgroundTask *task, pid_t *pid) +{ + if (pid) + { + if (!task->pid) + { + MemoryContext taskContext = GetMemoryChunkContext(task); + task->pid = MemoryContextAlloc(taskContext, sizeof(int32)); + } + *task->pid = *pid; + } + else + { + /* clear any existing pid */ + if (task->pid) + { + pfree(task->pid); + } + task->pid = NULL; + } +} + + BackgroundWorkerHandle * StartCitusBackgroundTaskMonitorWorker(Oid database, Oid extensionOwner) { @@ -144,14 +168,14 @@ CitusBackgroundTaskMonitorMain(Datum arg) PushActiveSnapshot(GetTransactionSnapshot()); /* - * We need to load the job into the perTaskContext as we will switch contexts + * We need to load the task into the perTaskContext as we will switch contexts * later due to the committing and starting of new transactions */ MemoryContext oldContext = MemoryContextSwitchTo(perTaskContext); - BackgroundTask *job = GetRunnableBackgroundTask(); + BackgroundTask *task = GetRunnableBackgroundTask(); MemoryContextSwitchTo(oldContext); - if (!job) + if (!task) { PopActiveSnapshot(); CommitTransactionCommand(); @@ -167,7 +191,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) /* TODO find the actual database and username */ BackgroundWorkerHandle *handle = - StartCitusBackgroundJobExecuter("postgres", "nilsdijk", job->command); + StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command); if (handle == NULL) { @@ -178,20 +202,23 @@ CitusBackgroundTaskMonitorMain(Datum arg) pid_t pid = 0; GetBackgroundWorkerPid(handle, &pid); - ereport(LOG, (errmsg("found job with jobid: %ld", job->taskid))); + ereport(LOG, (errmsg("found task with jobid: %ld", task->taskid))); StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - /* Update job status to indicate it is running */ - UpdateJobStatus(job->taskid, &pid, BACKGROUND_TASK_STATUS_RUNNING, NULL, NULL); + task->status = BACKGROUND_TASK_STATUS_RUNNING; + BackgroundTaskUpdatePid(task, &pid); + + /* Update task status to indicate it is running */ + UpdateBackgroundTask(task); PopActiveSnapshot(); CommitTransactionCommand(); MemoryContextSwitchTo(perTaskContext); - /* TODO keep polling the job */ + /* TODO keep polling the task */ while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) { int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; @@ -213,8 +240,10 @@ CitusBackgroundTaskMonitorMain(Datum arg) StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - /* TODO job can actually also have failed*/ - UpdateJobStatus(job->taskid, NULL, BACKGROUND_TASK_STATUS_DONE, NULL, NULL); + /* TODO task can actually also have failed*/ + task->status = BACKGROUND_TASK_STATUS_DONE; + BackgroundTaskUpdatePid(task, NULL); + UpdateBackgroundTask(task); PopActiveSnapshot(); CommitTransactionCommand(); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 1265f5a84..2c48f7a50 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -341,6 +341,7 @@ extern bool BackgroundTaskHasUmnetDependencies(int64 taskId); extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); +extern void UpdateBackgroundTask(BackgroundTask *task); extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, const int32 *retry_count, char *message); extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata);