add jobs tracking table

background-job-details
Nils Dijk 2022-08-19 16:08:02 +02:00 committed by Jelte Fennema
parent 624e29e9e9
commit a23d340fbe
15 changed files with 322 additions and 110 deletions

View File

@ -141,12 +141,18 @@ typedef struct MetadataCacheData
bool extensionLoaded; bool extensionLoaded;
Oid distShardRelationId; Oid distShardRelationId;
Oid distPlacementRelationId; Oid distPlacementRelationId;
Oid distBackgroundJobsRelationId;
Oid distBackgroundTasksRelationId; Oid distBackgroundTasksRelationId;
Oid distBackgroundTasksTaskIdIndexId; Oid distBackgroundTasksTaskIdIndexId;
Oid distBackgroundTasksStatusTaskIdIndexId; Oid distBackgroundTasksStatusTaskIdIndexId;
Oid distBackgroundTasksDependRelationId; Oid distBackgroundTasksDependRelationId;
Oid distBackgroundTasksDependTaskIdIndexId; Oid distBackgroundTasksDependTaskIdIndexId;
Oid distBackgroundTasksDependDependsOnIndexId; Oid distBackgroundTasksDependDependsOnIndexId;
Oid citusJobStatusScheduledId;
Oid citusJobStatusRunningId;
Oid citusJobStatusFinishedId;
Oid citusJobStatusCancelledId;
Oid citusJobStatusFailedId;
Oid citusTaskStatusScheduledId; Oid citusTaskStatusScheduledId;
Oid citusTaskStatusRunningId; Oid citusTaskStatusRunningId;
Oid citusTaskStatusDoneId; Oid citusTaskStatusDoneId;
@ -2375,6 +2381,16 @@ DistLocalGroupIdRelationId(void)
} }
Oid
DistBackgroundJobsRelationId(void)
{
CachedRelationLookup("pg_dist_background_jobs",
&MetadataCache.distBackgroundJobsRelationId);
return MetadataCache.distBackgroundJobsRelationId;
}
Oid Oid
DistBackgroundTasksRelationId(void) DistBackgroundTasksRelationId(void)
{ {
@ -3157,6 +3173,71 @@ SecondaryNodeRoleId(void)
} }
Oid
CitusJobStatusScheduledId(void)
{
if (!MetadataCache.citusJobStatusScheduledId)
{
MetadataCache.citusJobStatusScheduledId =
LookupStringEnumValueId("citus_job_status", "scheduled");
}
return MetadataCache.citusJobStatusScheduledId;
}
Oid
CitusJobStatusRunningId(void)
{
if (!MetadataCache.citusJobStatusRunningId)
{
MetadataCache.citusJobStatusRunningId =
LookupStringEnumValueId("citus_job_status", "running");
}
return MetadataCache.citusJobStatusRunningId;
}
Oid
CitusJobStatusFinisehdId(void)
{
if (!MetadataCache.citusJobStatusFinishedId)
{
MetadataCache.citusJobStatusFinishedId =
LookupStringEnumValueId("citus_job_status", "finished");
}
return MetadataCache.citusJobStatusFinishedId;
}
Oid
CitusJobStatusCancelledId(void)
{
if (!MetadataCache.citusJobStatusCancelledId)
{
MetadataCache.citusJobStatusCancelledId =
LookupStringEnumValueId("citus_job_status", "cancelled");
}
return MetadataCache.citusJobStatusCancelledId;
}
Oid
CitusJobStatusFailedId(void)
{
if (!MetadataCache.citusJobStatusFailedId)
{
MetadataCache.citusJobStatusFailedId =
LookupStringEnumValueId("citus_job_status", "failed");
}
return MetadataCache.citusJobStatusFailedId;
}
Oid Oid
CitusTaskStatusScheduledId(void) CitusTaskStatusScheduledId(void)
{ {

View File

@ -43,10 +43,11 @@
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_background_jobs.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_background_tasks.h" #include "distributed/pg_dist_background_tasks.h"
#include "distributed/pg_dist_backrgound_tasks_depend.h" #include "distributed/pg_dist_backrgound_tasks_depend.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
@ -2352,6 +2353,29 @@ CitusTaskStatusOid(BackgroundTaskStatus status)
} }
int64
GetNextBackgroundJobsJobId(void)
{
text *sequenceName = cstring_to_text(PG_DIST_BACKGROUND_JOBS_JOB_ID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
uint64 jobId = DatumGetInt64(jobIdOid);
return jobId;
}
int64 int64
GetNextBackgroundTaskTaskId(void) GetNextBackgroundTaskTaskId(void)
{ {
@ -2375,8 +2399,53 @@ GetNextBackgroundTaskTaskId(void)
} }
int64
CreateBackgroundJob(const char *jobType, const char *description)
{
Relation pgDistBackgroundJobs =
table_open(DistBackgroundJobsRelationId(), RowExclusiveLock);
/* insert new job */
Datum values[Natts_pg_dist_background_jobs] = { 0 };
bool nulls[Natts_pg_dist_background_jobs] = { 0 };
memset(nulls, true, sizeof(nulls));
int64 jobId = GetNextBackgroundJobsJobId();
values[Anum_pg_dist_background_jobs_job_id - 1] = Int64GetDatum(jobId);
nulls[Anum_pg_dist_background_jobs_job_id - 1] = false;
values[Anum_pg_dist_background_jobs_state - 1] = CitusJobStatusScheduledId();
nulls[Anum_pg_dist_background_jobs_state - 1] = false;
if (jobType)
{
NameData jobTypeName = { 0 };
namestrcpy(&jobTypeName, jobType);
values[Anum_pg_dist_background_jobs_job_type - 1] = NameGetDatum(&jobTypeName);
nulls[Anum_pg_dist_background_jobs_job_type - 1] = false;
}
if (description)
{
values[Anum_pg_dist_background_jobs_description - 1] =
CStringGetTextDatum(description);
nulls[Anum_pg_dist_background_jobs_description - 1] = false;
}
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundJobs),
values, nulls);
CatalogTupleInsert(pgDistBackgroundJobs, newTuple);
table_close(pgDistBackgroundJobs, NoLock);
return jobId;
}
BackgroundTask * BackgroundTask *
ScheduleBackgroundTask(char *command, int dependingTaskCount, ScheduleBackgroundTask(int64 jobId, char *command, int dependingTaskCount,
int64 dependingTaskIds[]) int64 dependingTaskIds[])
{ {
BackgroundTask *task = NULL; BackgroundTask *task = NULL;
@ -2401,6 +2470,9 @@ ScheduleBackgroundTask(char *command, int dependingTaskCount,
int64 taskId = GetNextBackgroundTaskTaskId(); int64 taskId = GetNextBackgroundTaskTaskId();
values[Anum_pg_dist_background_tasks_job_id - 1] = Int64GetDatum(jobId);
nulls[Anum_pg_dist_background_tasks_job_id - 1] = false;
values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(taskId); values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(taskId);
nulls[Anum_pg_dist_background_tasks_task_id - 1] = false; nulls[Anum_pg_dist_background_tasks_task_id - 1] = false;
@ -2430,6 +2502,10 @@ ScheduleBackgroundTask(char *command, int dependingTaskCount,
bool nulls[Natts_pg_dist_background_tasks_depend] = { 0 }; bool nulls[Natts_pg_dist_background_tasks_depend] = { 0 };
memset(nulls, true, sizeof(nulls)); memset(nulls, true, sizeof(nulls));
values[Anum_pg_dist_background_tasks_depend_job_id - 1] =
Int64GetDatum(jobId);
nulls[Anum_pg_dist_background_tasks_depend_job_id - 1] = false;
values[Anum_pg_dist_background_tasks_depend_task_id - 1] = values[Anum_pg_dist_background_tasks_depend_task_id - 1] =
Int64GetDatum(task->taskid); Int64GetDatum(task->taskid);
nulls[Anum_pg_dist_background_tasks_depend_task_id - 1] = false; nulls[Anum_pg_dist_background_tasks_depend_task_id - 1] = false;
@ -2543,6 +2619,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
BackgroundTask *task = palloc0(sizeof(BackgroundTask)); BackgroundTask *task = palloc0(sizeof(BackgroundTask));
task->jobid = DatumGetInt64(values[Anum_pg_dist_background_tasks_job_id - 1]);
task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]); task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]);
if (!nulls[Anum_pg_dist_background_tasks_pid - 1]) if (!nulls[Anum_pg_dist_background_tasks_pid - 1])
{ {
@ -2573,25 +2650,28 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
bool bool
BackgroundTaskHasUmnetDependencies(int64 taskId) BackgroundTaskHasUmnetDependencies(int64 jobId, int64 taskId)
{ {
bool hasUnmetDependency = false; bool hasUnmetDependency = false;
Relation pgDistBackgroundTasksDepend = Relation pgDistBackgroundTasksDepend =
table_open(DistBackgroundTasksDependRelationId(), AccessShareLock); table_open(DistBackgroundTasksDependRelationId(), AccessShareLock);
const int scanKeyCount = 1; ScanKeyData scanKey[2] = { 0 };
ScanKeyData scanKey[1] = { 0 };
bool indexOK = true; bool indexOK = true;
/* pg_catalog.pg_dist_background_tasks_depend.job_id = jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_job_id,
BTEqualStrategyNumber, F_INT8EQ, jobId);
/* pg_catalog.pg_dist_background_tasks_depend.task_id = $taskId */ /* pg_catalog.pg_dist_background_tasks_depend.task_id = $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_task_id, ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_depend_task_id,
BTEqualStrategyNumber, F_INT8EQ, taskId); BTEqualStrategyNumber, F_INT8EQ, taskId);
SysScanDesc scanDescriptor = SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasksDepend, systable_beginscan(pgDistBackgroundTasksDepend,
DistBackgroundTasksDependTaskIdIndexId(), DistBackgroundTasksDependTaskIdIndexId(),
indexOK, NULL, scanKeyCount, indexOK, NULL, lengthof(scanKey),
scanKey); scanKey);
HeapTuple dependTuple = NULL; HeapTuple dependTuple = NULL;
@ -2600,7 +2680,8 @@ BackgroundTaskHasUmnetDependencies(int64 taskId)
Form_pg_dist_background_tasks_depend depends = Form_pg_dist_background_tasks_depend depends =
(Form_pg_dist_background_tasks_depend) GETSTRUCT(dependTuple); (Form_pg_dist_background_tasks_depend) GETSTRUCT(dependTuple);
BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(depends->depends_on); BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(jobId,
depends->depends_on);
/* /*
* Only when the status of all depending jobs is done we clear this job and say * Only when the status of all depending jobs is done we clear this job and say
@ -2664,7 +2745,7 @@ GetRunnableBackgroundTask(void)
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{ {
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
if (!BackgroundTaskHasUmnetDependencies(task->taskid)) if (!BackgroundTaskHasUmnetDependencies(task->jobid, task->taskid))
{ {
/* found task, close table and return */ /* found task, close table and return */
break; break;
@ -2684,23 +2765,26 @@ GetRunnableBackgroundTask(void)
BackgroundTask * BackgroundTask *
GetBackgroundTaskByTaskId(int64 taskId) GetBackgroundTaskByTaskId(int64 jobId, int64 taskId)
{ {
const int scanKeyCount = 1; ScanKeyData scanKey[2] = { 0 };
ScanKeyData scanKey[1];
bool indexOK = true; bool indexOK = true;
Relation pgDistBackgroundTasks = Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), AccessShareLock); table_open(DistBackgroundTasksRelationId(), AccessShareLock);
/* pg_dist_background_tasks.job_id == $jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
/* pg_dist_background_tasks.task_id == $taskId */ /* pg_dist_background_tasks.task_id == $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId));
SysScanDesc scanDescriptor = SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks, systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksStatusTaskIdIndexId(), DistBackgroundTasksTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey); indexOK, NULL, lengthof(scanKey), scanKey);
HeapTuple taskTuple = systable_getnext(scanDescriptor); HeapTuple taskTuple = systable_getnext(scanDescriptor);
BackgroundTask *task = NULL; BackgroundTask *task = NULL;
@ -2724,24 +2808,28 @@ UpdateBackgroundTask(BackgroundTask *task)
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
int scanKeyCount = 1; ScanKeyData scanKey[2] = { 0 };
ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true; const bool indexOK = true;
/* WHERE job_id = $task->jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->jobid));
/* WHERE task_id = $task->taskid */ /* WHERE task_id = $task->taskid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid));
SysScanDesc scanDescriptor = SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasks, systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksTaskIdIndexId(), DistBackgroundTasksTaskIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey); indexOK, NULL, lengthof(scanKey), scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple)) if (!HeapTupleIsValid(heapTuple))
{ {
ereport(ERROR, (errmsg("could not find background task entry for task_id: " ereport(ERROR, (errmsg("could not find background task entry for :"
UINT64_FORMAT, task->taskid))); "job_id/task_id: " UINT64_FORMAT "/" UINT64_FORMAT,
task->jobid, task->taskid)));
} }
Datum values[Natts_pg_dist_background_tasks] = { 0 }; Datum values[Natts_pg_dist_background_tasks] = { 0 };
@ -2753,11 +2841,13 @@ UpdateBackgroundTask(BackgroundTask *task)
bool updated = false; bool updated = false;
#define UPDATE_FIELD(field, newNull, newValue) \ #define UPDATE_FIELD(field, newNull, newValue) \
replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \ { \
|| (values[(field - 1)] != (newValue))); \ replace[((field) - 1)] = (((newNull) != isnull[((field) - 1)]) \
isnull[(field - 1)] = (newNull); \ || (values[((field) - 1)] != (newValue))); \
values[(field - 1)] = (newValue); \ isnull[((field) - 1)] = (newNull); \
updated |= replace[(field - 1)] values[((field) - 1)] = (newValue); \
updated |= replace[((field) - 1)]; \
}
if (task->pid) if (task->pid)
{ {
@ -2836,24 +2926,27 @@ UpdateBackgroundTask(BackgroundTask *task)
static List * static List *
GetDependantTasks(int64 taskId) GetDependantTasks(int64 jobId, int64 taskId)
{ {
Relation pgDistBackgroundTasksDepends = Relation pgDistBackgroundTasksDepends =
table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock); table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock);
ScanKeyData scanKey[1] = { 0 }; ScanKeyData scanKey[2] = { 0 };
int scanKeyCount = 1;
const bool indexOK = true; const bool indexOK = true;
/* pg_dist_background_tasks_depend.job_id = $jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_job_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
/* pg_dist_background_tasks_depend.depends_on = $taskId */ /* pg_dist_background_tasks_depend.depends_on = $taskId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_depends_on, ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_depend_depends_on,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId));
SysScanDesc scanDescriptor = SysScanDesc scanDescriptor =
systable_beginscan(pgDistBackgroundTasksDepends, systable_beginscan(pgDistBackgroundTasksDepends,
DistBackgroundTasksDependDependsOnIndexId(), DistBackgroundTasksDependDependsOnIndexId(),
indexOK, indexOK,
NULL, scanKeyCount, scanKey); NULL, lengthof(scanKey), scanKey);
List *dependantTasks = NIL; List *dependantTasks = NIL;
HeapTuple heapTuple = NULL; HeapTuple heapTuple = NULL;
@ -2876,13 +2969,13 @@ GetDependantTasks(int64 taskId)
void void
UnscheduleDependantTasks(int64 taskId) UnscheduleDependantTasks(int64 jobId, int64 taskId)
{ {
Relation pgDistBackgroundTasks = Relation pgDistBackgroundTasks =
table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); table_open(DistBackgroundTasksRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
List *dependantTasks = GetDependantTasks(taskId); List *dependantTasks = GetDependantTasks(jobId, taskId);
while (list_length(dependantTasks) > 0) while (list_length(dependantTasks) > 0)
{ {
/* pop last item from stack */ /* pop last item from stack */
@ -2890,21 +2983,24 @@ UnscheduleDependantTasks(int64 taskId)
dependantTasks = list_delete_last(dependantTasks); dependantTasks = list_delete_last(dependantTasks);
/* push new dependant tasks on to stack */ /* push new dependant tasks on to stack */
dependantTasks = list_concat(dependantTasks, GetDependantTasks(cTaskId)); dependantTasks = list_concat(dependantTasks, GetDependantTasks(jobId, cTaskId));
/* unschedule current task */ /* unschedule current task */
{ {
ScanKeyData scanKey[1] = { 0 }; ScanKeyData scanKey[2] = { 0 };
int scanKeyCount = 1;
/* WHERE taskId = job->taskId */ /* WHERE jobId = $jobId */
ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId));
/* WHERE taskId = task->taskId */
ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId));
const bool indexOK = true; const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTasksTaskIdIndexId(), DistBackgroundTasksTaskIdIndexId(),
indexOK, indexOK, NULL,
NULL, scanKeyCount, scanKey); lengthof(scanKey), scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple)) if (!HeapTupleIsValid(heapTuple))

View File

@ -1560,7 +1560,14 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
DirectFunctionCall1(enum_out, shardReplicationModeOid); DirectFunctionCall1(enum_out, shardReplicationModeOid);
char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum); char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum);
if (list_length(placementUpdateList) == 0)
{
return;
}
/* schedule planned moves */ /* schedule planned moves */
int64 jobId = CreateBackgroundJob("rebalance", "Rebalance colocation group ...");
PlacementUpdateEvent *move = NULL; PlacementUpdateEvent *move = NULL;
StringInfoData buf = { 0 }; StringInfoData buf = { 0 };
initStringInfo(&buf); initStringInfo(&buf);
@ -1579,9 +1586,9 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
move->targetNode->workerPort, move->targetNode->workerPort,
quote_literal_cstr(shardTranferModeLabel)); quote_literal_cstr(shardTranferModeLabel));
BackgroundTask *job = ScheduleBackgroundTask(buf.data, first ? 0 : 1, BackgroundTask *task = ScheduleBackgroundTask(jobId, buf.data, first ? 0 : 1,
&prevJobId); &prevJobId);
prevJobId = job->taskid; prevJobId = task->taskid;
first = false; first = false;
} }

View File

@ -77,10 +77,25 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failed');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_background_jobs (
job_id bigserial NOT NULL,
state pg_catalog.citus_job_status DEFAULT 'scheduled' NOT NULL,
job_type name,
description text,
started_at timestamptz,
finished_at timestamptz
);
ALTER TABLE citus.pg_dist_background_jobs SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_background_jobs_job_id_index ON pg_catalog.pg_dist_background_jobs using btree(job_id);
CREATE TYPE citus.citus_task_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); CREATE TYPE citus.citus_task_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled');
ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog; ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_background_tasks( CREATE TABLE citus.pg_dist_background_tasks(
job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_jobs(job_id),
task_id bigserial NOT NULL, task_id bigserial NOT NULL,
pid integer, pid integer,
status pg_catalog.citus_task_status default 'scheduled' NOT NULL, status pg_catalog.citus_task_status default 'scheduled' NOT NULL,
@ -88,20 +103,22 @@ CREATE TABLE citus.pg_dist_background_tasks(
retry_count integer, retry_count integer,
message text message text
); );
ALTER TABLE citus.pg_dist_background_tasks SET SCHEMA pg_catalog; ALTER TABLE citus.pg_dist_background_tasks SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(task_id); CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(job_id, task_id);
CREATE INDEX pg_dist_background_tasks_status_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(status, task_id); CREATE INDEX pg_dist_background_tasks_status_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(status, task_id);
CREATE TABLE citus.pg_dist_background_tasks_depend( CREATE TABLE citus.pg_dist_background_tasks_depend(
task_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_jobs(job_id) ON DELETE CASCADE,
depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, task_id bigint NOT NULL,
depends_on bigint NOT NULL,
UNIQUE(task_id, depends_on) UNIQUE(job_id, task_id, depends_on),
FOREIGN KEY (job_id, task_id) REFERENCES pg_catalog.pg_dist_background_tasks (job_id, task_id) ON DELETE CASCADE,
FOREIGN KEY (job_id, depends_on) REFERENCES pg_catalog.pg_dist_background_tasks (job_id, task_id) ON DELETE CASCADE
); );
ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog; ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(task_id); CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, task_id);
CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(depends_on); CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, depends_on);
#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" #include "udfs/citus_job_wait/11.1-1.sql"

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_jobs_wait(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_jobs_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint)
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) TO PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE FUNCTION pg_catalog.citus_jobs_wait(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_jobs_wait$$;
COMMENT ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint)
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) TO PUBLIC;

View File

@ -1,8 +0,0 @@
CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$;
COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) TO PUBLIC;

View File

@ -1,8 +0,0 @@
CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$;
COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint)
IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) TO PUBLIC;

View File

@ -326,7 +326,7 @@ CitusBackgroundTaskMonitorMain(Datum arg)
task->status = BACKGROUND_TASK_STATUS_ERROR; task->status = BACKGROUND_TASK_STATUS_ERROR;
/* when we error this task, we need to unschedule all dependant tasks */ /* when we error this task, we need to unschedule all dependant tasks */
UnscheduleDependantTasks(task->taskid); UnscheduleDependantTasks(task->jobid, task->taskid);
} }
else else
{ {

View File

@ -10,37 +10,15 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
/* pg_catalog.citus_wait_for_rebalance_job(jobid bigint) */ /* pg_catalog.citus_job_wait(jobid bigint) */
PG_FUNCTION_INFO_V1(citus_wait_for_rebalance_job); PG_FUNCTION_INFO_V1(citus_jobs_wait);
Datum Datum
citus_wait_for_rebalance_job(PG_FUNCTION_ARGS) citus_jobs_wait(PG_FUNCTION_ARGS)
{ {
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
int64 jobid = PG_GETARG_INT64(0); /* int64 jobid = PG_GETARG_INT64(0); */
ereport(ERROR, (errmsg("not implemented")));
for (;;)
{
CHECK_FOR_INTERRUPTS();
BackgroundTask *job = GetBackgroundTaskByTaskId(jobid);
if (!job)
{
ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid)));
}
if (IsCitusTaskStatusTerminal(job->status))
{
PG_RETURN_VOID();
}
/* not finished, sleeping */
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1000, /* TODO, what timeout would be good here */
WAIT_EVENT_PG_SLEEP);
ResetLatch(MyLatch);
}
} }

View File

@ -227,6 +227,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void); extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistBackgroundJobsRelationId(void);
extern Oid DistBackgroundTasksRelationId(void); extern Oid DistBackgroundTasksRelationId(void);
extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistRebalanceStrategyRelationId(void);
extern Oid DistLocalGroupIdRelationId(void); extern Oid DistLocalGroupIdRelationId(void);
@ -274,6 +275,11 @@ extern Oid SecondaryNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void); extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void); extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void); extern Oid BinaryCopyFormatId(void);
extern Oid CitusJobStatusScheduledId(void);
extern Oid CitusJobStatusRunningId(void);
extern Oid CitusJobStatusFinisehdId(void);
extern Oid CitusJobStatusCancelledId(void);
extern Oid CitusJobStatusFailedId(void);
extern Oid CitusTaskStatusScheduledId(void); extern Oid CitusTaskStatusScheduledId(void);
extern Oid CitusTaskStatusRunningId(void); extern Oid CitusTaskStatusRunningId(void);
extern Oid CitusTaskStatusDoneId(void); extern Oid CitusTaskStatusDoneId(void);

View File

@ -217,6 +217,7 @@ typedef enum BackgroundTaskStatus
typedef struct BackgroundTask typedef struct BackgroundTask
{ {
int64 jobid;
int64 taskid; int64 taskid;
int32 *pid; int32 *pid;
BackgroundTaskStatus status; BackgroundTaskStatus status;
@ -334,19 +335,22 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledBackgroundTask(void); extern bool HasScheduledBackgroundTask(void);
extern int64 GetNextBackgroundJobsJobId(void);
extern int64 GetNextBackgroundTaskTaskId(void); extern int64 GetNextBackgroundTaskTaskId(void);
extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskCount, extern int64 CreateBackgroundJob(const char *jobType, const char *description);
extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, char *command,
int dependingTaskCount,
int64 dependingTaskIds[]); int64 dependingTaskIds[]);
extern bool BackgroundTaskHasUmnetDependencies(int64 taskId); extern bool BackgroundTaskHasUmnetDependencies(int64 jobId, int64 taskId);
extern BackgroundTask * GetRunnableBackgroundTask(void); extern BackgroundTask * GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void); extern void ResetRunningBackgroundTasks(void);
extern void DeepFreeBackgroundTask(BackgroundTask *task); extern void DeepFreeBackgroundTask(BackgroundTask *task);
extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); extern BackgroundTask * GetBackgroundTaskByTaskId(int64 jobId, int64 taskId);
extern void UpdateBackgroundTask(BackgroundTask *task); extern void UpdateBackgroundTask(BackgroundTask *task);
extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status,
const int32 *retry_count, char *message); const int32 *retry_count, char *message);
extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata); extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata);
extern void UnscheduleDependantTasks(int64 taskId); extern void UnscheduleDependantTasks(int64 jobId, int64 taskId);
extern bool IsCitusTaskStatusTerminal(BackgroundTaskStatus status); extern bool IsCitusTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid CitusTaskStatusOid(BackgroundTaskStatus status); extern Oid CitusTaskStatusOid(BackgroundTaskStatus status);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,20 @@
#ifndef CITUS_PG_DIST_BACKGROUND_JOBS_H
#define CITUS_PG_DIST_BACKGROUND_JOBS_H
/* ----------------
* compiler constants for pg_dist_background_jobs
* ----------------
*/
#define Natts_pg_dist_background_jobs 6
#define Anum_pg_dist_background_jobs_job_id 1
#define Anum_pg_dist_background_jobs_state 2
#define Anum_pg_dist_background_jobs_job_type 3
#define Anum_pg_dist_background_jobs_description 4
#define Anum_pg_dist_background_jobs_started_at 5
#define Anum_pg_dist_background_jobs_finished_at 6
#define PG_DIST_BACKGROUND_JOBS_JOB_ID_SEQUENCE_NAME \
"pg_catalog.pg_dist_background_jobs_job_id_seq"
#endif /* CITUS_PG_DIST_BACKGROUND_JOBS_H */

View File

@ -6,13 +6,14 @@
* compiler constants for pg_dist_background_tasks * compiler constants for pg_dist_background_tasks
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_background_tasks 6 #define Natts_pg_dist_background_tasks 7
#define Anum_pg_dist_background_tasks_task_id 1 #define Anum_pg_dist_background_tasks_job_id 1
#define Anum_pg_dist_background_tasks_pid 2 #define Anum_pg_dist_background_tasks_task_id 2
#define Anum_pg_dist_background_tasks_status 3 #define Anum_pg_dist_background_tasks_pid 3
#define Anum_pg_dist_background_tasks_command 4 #define Anum_pg_dist_background_tasks_status 4
#define Anum_pg_dist_background_tasks_retry_count 5 #define Anum_pg_dist_background_tasks_command 5
#define Anum_pg_dist_background_tasks_message 6 #define Anum_pg_dist_background_tasks_retry_count 6
#define Anum_pg_dist_background_tasks_message 7
#define PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME \ #define PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME \
"pg_catalog.pg_dist_background_tasks_task_id_seq" "pg_catalog.pg_dist_background_tasks_task_id_seq"

View File

@ -4,6 +4,7 @@
typedef struct FormData_pg_dist_background_tasks_depend typedef struct FormData_pg_dist_background_tasks_depend
{ {
int64 job_id;
int64 task_id; int64 task_id;
int64 depends_on; int64 depends_on;
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
@ -16,8 +17,9 @@ typedef FormData_pg_dist_background_tasks_depend *Form_pg_dist_background_tasks_
* compiler constants for pg_dist_background_tasks_depend * compiler constants for pg_dist_background_tasks_depend
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_background_tasks_depend 2 #define Natts_pg_dist_background_tasks_depend 3
#define Anum_pg_dist_background_tasks_depend_task_id 1 #define Anum_pg_dist_background_tasks_depend_job_id 1
#define Anum_pg_dist_background_tasks_depend_depends_on 2 #define Anum_pg_dist_background_tasks_depend_task_id 2
#define Anum_pg_dist_background_tasks_depend_depends_on 3
#endif /* CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H */ #endif /* CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H */