From a23d340fbead6eec585c2cf2e21393f1c7a8c79f Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 19 Aug 2022 16:08:02 +0200 Subject: [PATCH] add jobs tracking table --- .../distributed/metadata/metadata_cache.c | 81 ++++++++ .../distributed/metadata/metadata_utility.c | 178 ++++++++++++++---- .../distributed/operations/shard_rebalancer.c | 13 +- .../distributed/sql/citus--11.0-4--11.1-1.sql | 33 +++- .../sql/udfs/citus_job_wait/11.1-1.sql | 8 + .../sql/udfs/citus_job_wait/latest.sql | 8 + .../citus_wait_for_rebalance_job/11.1-1.sql | 8 - .../citus_wait_for_rebalance_job/latest.sql | 8 - .../distributed/utils/background_jobs.c | 2 +- src/backend/distributed/utils/rebalance_job.c | 32 +--- src/include/distributed/metadata_cache.h | 6 + src/include/distributed/metadata_utility.h | 12 +- .../distributed/pg_dist_background_jobs.h | 20 ++ .../distributed/pg_dist_background_tasks.h | 15 +- .../pg_dist_backrgound_tasks_depend.h | 8 +- 15 files changed, 322 insertions(+), 110 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_job_wait/latest.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql create mode 100644 src/include/distributed/pg_dist_background_jobs.h diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 7c1f44113..cbcb2f396 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -141,12 +141,18 @@ typedef struct MetadataCacheData bool extensionLoaded; Oid distShardRelationId; Oid distPlacementRelationId; + Oid distBackgroundJobsRelationId; Oid distBackgroundTasksRelationId; Oid distBackgroundTasksTaskIdIndexId; Oid distBackgroundTasksStatusTaskIdIndexId; Oid distBackgroundTasksDependRelationId; Oid distBackgroundTasksDependTaskIdIndexId; Oid distBackgroundTasksDependDependsOnIndexId; + Oid citusJobStatusScheduledId; + Oid citusJobStatusRunningId; + Oid citusJobStatusFinishedId; + Oid citusJobStatusCancelledId; + Oid citusJobStatusFailedId; Oid citusTaskStatusScheduledId; Oid citusTaskStatusRunningId; Oid citusTaskStatusDoneId; @@ -2375,6 +2381,16 @@ DistLocalGroupIdRelationId(void) } +Oid +DistBackgroundJobsRelationId(void) +{ + CachedRelationLookup("pg_dist_background_jobs", + &MetadataCache.distBackgroundJobsRelationId); + + return MetadataCache.distBackgroundJobsRelationId; +} + + Oid DistBackgroundTasksRelationId(void) { @@ -3157,6 +3173,71 @@ SecondaryNodeRoleId(void) } +Oid +CitusJobStatusScheduledId(void) +{ + if (!MetadataCache.citusJobStatusScheduledId) + { + MetadataCache.citusJobStatusScheduledId = + LookupStringEnumValueId("citus_job_status", "scheduled"); + } + + return MetadataCache.citusJobStatusScheduledId; +} + + +Oid +CitusJobStatusRunningId(void) +{ + if (!MetadataCache.citusJobStatusRunningId) + { + MetadataCache.citusJobStatusRunningId = + LookupStringEnumValueId("citus_job_status", "running"); + } + + return MetadataCache.citusJobStatusRunningId; +} + + +Oid +CitusJobStatusFinisehdId(void) +{ + if (!MetadataCache.citusJobStatusFinishedId) + { + MetadataCache.citusJobStatusFinishedId = + LookupStringEnumValueId("citus_job_status", "finished"); + } + + return MetadataCache.citusJobStatusFinishedId; +} + + +Oid +CitusJobStatusCancelledId(void) +{ + if (!MetadataCache.citusJobStatusCancelledId) + { + MetadataCache.citusJobStatusCancelledId = + LookupStringEnumValueId("citus_job_status", "cancelled"); + } + + return MetadataCache.citusJobStatusCancelledId; +} + + +Oid +CitusJobStatusFailedId(void) +{ + if (!MetadataCache.citusJobStatusFailedId) + { + MetadataCache.citusJobStatusFailedId = + LookupStringEnumValueId("citus_job_status", "failed"); + } + + return MetadataCache.citusJobStatusFailedId; +} + + Oid CitusTaskStatusScheduledId(void) { diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 27fc24974..84d1a60bf 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -43,10 +43,11 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" -#include "distributed/pg_dist_colocation.h" -#include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_background_jobs.h" #include "distributed/pg_dist_background_tasks.h" #include "distributed/pg_dist_backrgound_tasks_depend.h" +#include "distributed/pg_dist_colocation.h" +#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" #include "distributed/reference_table_utils.h" @@ -2352,6 +2353,29 @@ CitusTaskStatusOid(BackgroundTaskStatus status) } +int64 +GetNextBackgroundJobsJobId(void) +{ + text *sequenceName = cstring_to_text(PG_DIST_BACKGROUND_JOBS_JOB_ID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName, false); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + uint64 jobId = DatumGetInt64(jobIdOid); + + return jobId; +} + + int64 GetNextBackgroundTaskTaskId(void) { @@ -2375,8 +2399,53 @@ GetNextBackgroundTaskTaskId(void) } +int64 +CreateBackgroundJob(const char *jobType, const char *description) +{ + Relation pgDistBackgroundJobs = + table_open(DistBackgroundJobsRelationId(), RowExclusiveLock); + + /* insert new job */ + Datum values[Natts_pg_dist_background_jobs] = { 0 }; + bool nulls[Natts_pg_dist_background_jobs] = { 0 }; + + memset(nulls, true, sizeof(nulls)); + + int64 jobId = GetNextBackgroundJobsJobId(); + + values[Anum_pg_dist_background_jobs_job_id - 1] = Int64GetDatum(jobId); + nulls[Anum_pg_dist_background_jobs_job_id - 1] = false; + + values[Anum_pg_dist_background_jobs_state - 1] = CitusJobStatusScheduledId(); + nulls[Anum_pg_dist_background_jobs_state - 1] = false; + + if (jobType) + { + NameData jobTypeName = { 0 }; + namestrcpy(&jobTypeName, jobType); + values[Anum_pg_dist_background_jobs_job_type - 1] = NameGetDatum(&jobTypeName); + nulls[Anum_pg_dist_background_jobs_job_type - 1] = false; + } + + if (description) + { + values[Anum_pg_dist_background_jobs_description - 1] = + CStringGetTextDatum(description); + nulls[Anum_pg_dist_background_jobs_description - 1] = false; + } + + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundJobs), + values, nulls); + CatalogTupleInsert(pgDistBackgroundJobs, newTuple); + + table_close(pgDistBackgroundJobs, NoLock); + + return jobId; +} + + BackgroundTask * -ScheduleBackgroundTask(char *command, int dependingTaskCount, +ScheduleBackgroundTask(int64 jobId, char *command, int dependingTaskCount, int64 dependingTaskIds[]) { BackgroundTask *task = NULL; @@ -2401,6 +2470,9 @@ ScheduleBackgroundTask(char *command, int dependingTaskCount, int64 taskId = GetNextBackgroundTaskTaskId(); + values[Anum_pg_dist_background_tasks_job_id - 1] = Int64GetDatum(jobId); + nulls[Anum_pg_dist_background_tasks_job_id - 1] = false; + values[Anum_pg_dist_background_tasks_task_id - 1] = Int64GetDatum(taskId); nulls[Anum_pg_dist_background_tasks_task_id - 1] = false; @@ -2430,6 +2502,10 @@ ScheduleBackgroundTask(char *command, int dependingTaskCount, bool nulls[Natts_pg_dist_background_tasks_depend] = { 0 }; memset(nulls, true, sizeof(nulls)); + values[Anum_pg_dist_background_tasks_depend_job_id - 1] = + Int64GetDatum(jobId); + nulls[Anum_pg_dist_background_tasks_depend_job_id - 1] = false; + values[Anum_pg_dist_background_tasks_depend_task_id - 1] = Int64GetDatum(task->taskid); nulls[Anum_pg_dist_background_tasks_depend_task_id - 1] = false; @@ -2543,6 +2619,7 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); BackgroundTask *task = palloc0(sizeof(BackgroundTask)); + task->jobid = DatumGetInt64(values[Anum_pg_dist_background_tasks_job_id - 1]); task->taskid = DatumGetInt64(values[Anum_pg_dist_background_tasks_task_id - 1]); if (!nulls[Anum_pg_dist_background_tasks_pid - 1]) { @@ -2573,25 +2650,28 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) bool -BackgroundTaskHasUmnetDependencies(int64 taskId) +BackgroundTaskHasUmnetDependencies(int64 jobId, int64 taskId) { bool hasUnmetDependency = false; Relation pgDistBackgroundTasksDepend = table_open(DistBackgroundTasksDependRelationId(), AccessShareLock); - const int scanKeyCount = 1; - ScanKeyData scanKey[1] = { 0 }; + ScanKeyData scanKey[2] = { 0 }; bool indexOK = true; + /* pg_catalog.pg_dist_background_tasks_depend.job_id = jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_job_id, + BTEqualStrategyNumber, F_INT8EQ, jobId); + /* pg_catalog.pg_dist_background_tasks_depend.task_id = $taskId */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_task_id, + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_depend_task_id, BTEqualStrategyNumber, F_INT8EQ, taskId); SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasksDepend, DistBackgroundTasksDependTaskIdIndexId(), - indexOK, NULL, scanKeyCount, + indexOK, NULL, lengthof(scanKey), scanKey); HeapTuple dependTuple = NULL; @@ -2600,7 +2680,8 @@ BackgroundTaskHasUmnetDependencies(int64 taskId) Form_pg_dist_background_tasks_depend depends = (Form_pg_dist_background_tasks_depend) GETSTRUCT(dependTuple); - BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(depends->depends_on); + BackgroundTask *dependingJob = GetBackgroundTaskByTaskId(jobId, + depends->depends_on); /* * Only when the status of all depending jobs is done we clear this job and say @@ -2664,7 +2745,7 @@ GetRunnableBackgroundTask(void) while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple); - if (!BackgroundTaskHasUmnetDependencies(task->taskid)) + if (!BackgroundTaskHasUmnetDependencies(task->jobid, task->taskid)) { /* found task, close table and return */ break; @@ -2684,23 +2765,26 @@ GetRunnableBackgroundTask(void) BackgroundTask * -GetBackgroundTaskByTaskId(int64 taskId) +GetBackgroundTaskByTaskId(int64 jobId, int64 taskId) { - const int scanKeyCount = 1; - ScanKeyData scanKey[1]; + ScanKeyData scanKey[2] = { 0 }; bool indexOK = true; Relation pgDistBackgroundTasks = table_open(DistBackgroundTasksRelationId(), AccessShareLock); + /* pg_dist_background_tasks.job_id == $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + /* pg_dist_background_tasks.task_id == $taskId */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, - DistBackgroundTasksStatusTaskIdIndexId(), - indexOK, NULL, scanKeyCount, scanKey); + DistBackgroundTasksTaskIdIndexId(), + indexOK, NULL, lengthof(scanKey), scanKey); HeapTuple taskTuple = systable_getnext(scanDescriptor); BackgroundTask *task = NULL; @@ -2724,24 +2808,28 @@ UpdateBackgroundTask(BackgroundTask *task) table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); - int scanKeyCount = 1; - ScanKeyData scanKey[1] = { 0 }; + ScanKeyData scanKey[2] = { 0 }; const bool indexOK = true; + /* WHERE job_id = $task->jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->jobid)); + /* WHERE task_id = $task->taskid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(task->taskid)); SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, DistBackgroundTasksTaskIdIndexId(), - indexOK, NULL, scanKeyCount, scanKey); + indexOK, NULL, lengthof(scanKey), scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) { - ereport(ERROR, (errmsg("could not find background task entry for task_id: " - UINT64_FORMAT, task->taskid))); + ereport(ERROR, (errmsg("could not find background task entry for :" + "job_id/task_id: " UINT64_FORMAT "/" UINT64_FORMAT, + task->jobid, task->taskid))); } Datum values[Natts_pg_dist_background_tasks] = { 0 }; @@ -2753,11 +2841,13 @@ UpdateBackgroundTask(BackgroundTask *task) bool updated = false; #define UPDATE_FIELD(field, newNull, newValue) \ - replace[(field - 1)] = (((newNull) != isnull[(field - 1)]) \ - || (values[(field - 1)] != (newValue))); \ - isnull[(field - 1)] = (newNull); \ - values[(field - 1)] = (newValue); \ - updated |= replace[(field - 1)] + { \ + replace[((field) - 1)] = (((newNull) != isnull[((field) - 1)]) \ + || (values[((field) - 1)] != (newValue))); \ + isnull[((field) - 1)] = (newNull); \ + values[((field) - 1)] = (newValue); \ + updated |= replace[((field) - 1)]; \ + } if (task->pid) { @@ -2836,24 +2926,27 @@ UpdateBackgroundTask(BackgroundTask *task) static List * -GetDependantTasks(int64 taskId) +GetDependantTasks(int64 jobId, int64 taskId) { Relation pgDistBackgroundTasksDepends = table_open(DistBackgroundTasksDependRelationId(), RowExclusiveLock); - ScanKeyData scanKey[1] = { 0 }; - int scanKeyCount = 1; + ScanKeyData scanKey[2] = { 0 }; const bool indexOK = true; + /* pg_dist_background_tasks_depend.job_id = $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + /* pg_dist_background_tasks_depend.depends_on = $taskId */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_depend_depends_on, + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_depend_depends_on, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(taskId)); SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasksDepends, DistBackgroundTasksDependDependsOnIndexId(), indexOK, - NULL, scanKeyCount, scanKey); + NULL, lengthof(scanKey), scanKey); List *dependantTasks = NIL; HeapTuple heapTuple = NULL; @@ -2876,13 +2969,13 @@ GetDependantTasks(int64 taskId) void -UnscheduleDependantTasks(int64 taskId) +UnscheduleDependantTasks(int64 jobId, int64 taskId) { Relation pgDistBackgroundTasks = table_open(DistBackgroundTasksRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); - List *dependantTasks = GetDependantTasks(taskId); + List *dependantTasks = GetDependantTasks(jobId, taskId); while (list_length(dependantTasks) > 0) { /* pop last item from stack */ @@ -2890,21 +2983,24 @@ UnscheduleDependantTasks(int64 taskId) dependantTasks = list_delete_last(dependantTasks); /* push new dependant tasks on to stack */ - dependantTasks = list_concat(dependantTasks, GetDependantTasks(cTaskId)); + dependantTasks = list_concat(dependantTasks, GetDependantTasks(jobId, cTaskId)); /* unschedule current task */ { - ScanKeyData scanKey[1] = { 0 }; - int scanKeyCount = 1; + ScanKeyData scanKey[2] = { 0 }; - /* WHERE taskId = job->taskId */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_task_id, + /* WHERE jobId = $jobId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_background_tasks_job_id, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobId)); + + /* WHERE taskId = task->taskId */ + ScanKeyInit(&scanKey[1], Anum_pg_dist_background_tasks_task_id, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(cTaskId)); const bool indexOK = true; SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, DistBackgroundTasksTaskIdIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); + indexOK, NULL, + lengthof(scanKey), scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 65611a5b5..232316e1c 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1560,7 +1560,14 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) DirectFunctionCall1(enum_out, shardReplicationModeOid); char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum); + if (list_length(placementUpdateList) == 0) + { + return; + } + /* schedule planned moves */ + int64 jobId = CreateBackgroundJob("rebalance", "Rebalance colocation group ..."); + PlacementUpdateEvent *move = NULL; StringInfoData buf = { 0 }; initStringInfo(&buf); @@ -1579,9 +1586,9 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) move->targetNode->workerPort, quote_literal_cstr(shardTranferModeLabel)); - BackgroundTask *job = ScheduleBackgroundTask(buf.data, first ? 0 : 1, - &prevJobId); - prevJobId = job->taskid; + BackgroundTask *task = ScheduleBackgroundTask(jobId, buf.data, first ? 0 : 1, + &prevJobId); + prevJobId = task->taskid; first = false; } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index bd968e430..63f7bbf66 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,10 +77,25 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'running', 'finished', 'cancelled', 'failed'); +ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; + +CREATE TABLE citus.pg_dist_background_jobs ( + job_id bigserial NOT NULL, + state pg_catalog.citus_job_status DEFAULT 'scheduled' NOT NULL, + job_type name, + description text, + started_at timestamptz, + finished_at timestamptz +); +ALTER TABLE citus.pg_dist_background_jobs SET SCHEMA pg_catalog; +CREATE UNIQUE INDEX pg_dist_background_jobs_job_id_index ON pg_catalog.pg_dist_background_jobs using btree(job_id); + CREATE TYPE citus.citus_task_status AS ENUM ('scheduled', 'running', 'done', 'error', 'unscheduled'); ALTER TYPE citus.citus_task_status SET SCHEMA pg_catalog; CREATE TABLE citus.pg_dist_background_tasks( + job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_jobs(job_id), task_id bigserial NOT NULL, pid integer, status pg_catalog.citus_task_status default 'scheduled' NOT NULL, @@ -88,20 +103,22 @@ CREATE TABLE citus.pg_dist_background_tasks( retry_count integer, message text ); - ALTER TABLE citus.pg_dist_background_tasks SET SCHEMA pg_catalog; -CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(task_id); +CREATE UNIQUE INDEX pg_dist_background_tasks_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(job_id, task_id); CREATE INDEX pg_dist_background_tasks_status_task_id_index ON pg_catalog.pg_dist_background_tasks using btree(status, task_id); CREATE TABLE citus.pg_dist_background_tasks_depend( - task_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, - depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_tasks(task_id) ON DELETE CASCADE, + job_id bigint NOT NULL REFERENCES pg_catalog.pg_dist_background_jobs(job_id) ON DELETE CASCADE, + task_id bigint NOT NULL, + depends_on bigint NOT NULL, - UNIQUE(task_id, depends_on) + UNIQUE(job_id, task_id, depends_on), + FOREIGN KEY (job_id, task_id) REFERENCES pg_catalog.pg_dist_background_tasks (job_id, task_id) ON DELETE CASCADE, + FOREIGN KEY (job_id, depends_on) REFERENCES pg_catalog.pg_dist_background_tasks (job_id, task_id) ON DELETE CASCADE ); ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog; -CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(task_id); -CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(depends_on); +CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, task_id); +CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, depends_on); -#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" +#include "udfs/citus_job_wait/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql new file mode 100644 index 000000000..00e72913d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_wait/11.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_jobs_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) + IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql b/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql new file mode 100644 index 000000000..00e72913d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_job_wait/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$citus_jobs_wait$$; +COMMENT ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) + IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; + +GRANT EXECUTE ON FUNCTION pg_catalog.citus_jobs_wait(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql deleted file mode 100644 index b35373dc9..000000000 --- a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/11.1-1.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) - RETURNS VOID - LANGUAGE C STRICT - AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$; -COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) - IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; - -GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql b/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql deleted file mode 100644 index b35373dc9..000000000 --- a/src/backend/distributed/sql/udfs/citus_wait_for_rebalance_job/latest.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) - RETURNS VOID - LANGUAGE C STRICT - AS 'MODULE_PATHNAME',$$citus_wait_for_rebalance_job$$; -COMMENT ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) - IS 'blocks till the job identified by jobid is at a terminal state, errors if no such job exists'; - -GRANT EXECUTE ON FUNCTION pg_catalog.citus_wait_for_rebalance_job(jobid bigint) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2106df6a7..657dbc90e 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -326,7 +326,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) task->status = BACKGROUND_TASK_STATUS_ERROR; /* when we error this task, we need to unschedule all dependant tasks */ - UnscheduleDependantTasks(task->taskid); + UnscheduleDependantTasks(task->jobid, task->taskid); } else { diff --git a/src/backend/distributed/utils/rebalance_job.c b/src/backend/distributed/utils/rebalance_job.c index 52b5a3cd8..7194d53bb 100644 --- a/src/backend/distributed/utils/rebalance_job.c +++ b/src/backend/distributed/utils/rebalance_job.c @@ -10,37 +10,15 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" -/* pg_catalog.citus_wait_for_rebalance_job(jobid bigint) */ -PG_FUNCTION_INFO_V1(citus_wait_for_rebalance_job); +/* pg_catalog.citus_job_wait(jobid bigint) */ +PG_FUNCTION_INFO_V1(citus_jobs_wait); Datum -citus_wait_for_rebalance_job(PG_FUNCTION_ARGS) +citus_jobs_wait(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureCoordinator(); - int64 jobid = PG_GETARG_INT64(0); - - for (;;) - { - CHECK_FOR_INTERRUPTS(); - - BackgroundTask *job = GetBackgroundTaskByTaskId(jobid); - if (!job) - { - ereport(ERROR, (errmsg("unkown job with jobid: %ld", jobid))); - } - - if (IsCitusTaskStatusTerminal(job->status)) - { - PG_RETURN_VOID(); - } - - /* not finished, sleeping */ - (void) WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - 1000, /* TODO, what timeout would be good here */ - WAIT_EVENT_PG_SLEEP); - ResetLatch(MyLatch); - } + /* int64 jobid = PG_GETARG_INT64(0); */ + ereport(ERROR, (errmsg("not implemented"))); } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 60927b81e..04b660956 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -227,6 +227,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistBackgroundJobsRelationId(void); extern Oid DistBackgroundTasksRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistLocalGroupIdRelationId(void); @@ -274,6 +275,11 @@ extern Oid SecondaryNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); +extern Oid CitusJobStatusScheduledId(void); +extern Oid CitusJobStatusRunningId(void); +extern Oid CitusJobStatusFinisehdId(void); +extern Oid CitusJobStatusCancelledId(void); +extern Oid CitusJobStatusFailedId(void); extern Oid CitusTaskStatusScheduledId(void); extern Oid CitusTaskStatusRunningId(void); extern Oid CitusTaskStatusDoneId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 09efb9805..8d7de80a3 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -217,6 +217,7 @@ typedef enum BackgroundTaskStatus typedef struct BackgroundTask { + int64 jobid; int64 taskid; int32 *pid; BackgroundTaskStatus status; @@ -334,19 +335,22 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledBackgroundTask(void); +extern int64 GetNextBackgroundJobsJobId(void); extern int64 GetNextBackgroundTaskTaskId(void); -extern BackgroundTask * ScheduleBackgroundTask(char *command, int dependingTaskCount, +extern int64 CreateBackgroundJob(const char *jobType, const char *description); +extern BackgroundTask * ScheduleBackgroundTask(int64 jobId, char *command, + int dependingTaskCount, int64 dependingTaskIds[]); -extern bool BackgroundTaskHasUmnetDependencies(int64 taskId); +extern bool BackgroundTaskHasUmnetDependencies(int64 jobId, int64 taskId); extern BackgroundTask * GetRunnableBackgroundTask(void); extern void ResetRunningBackgroundTasks(void); extern void DeepFreeBackgroundTask(BackgroundTask *task); -extern BackgroundTask * GetBackgroundTaskByTaskId(int64 taskId); +extern BackgroundTask * GetBackgroundTaskByTaskId(int64 jobId, int64 taskId); extern void UpdateBackgroundTask(BackgroundTask *task); extern void UpdateJobStatus(int64 taskId, const pid_t *pid, BackgroundTaskStatus status, const int32 *retry_count, char *message); extern bool UpdateJobError(BackgroundTask *job, ErrorData *edata); -extern void UnscheduleDependantTasks(int64 taskId); +extern void UnscheduleDependantTasks(int64 jobId, int64 taskId); extern bool IsCitusTaskStatusTerminal(BackgroundTaskStatus status); extern Oid CitusTaskStatusOid(BackgroundTaskStatus status); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_background_jobs.h b/src/include/distributed/pg_dist_background_jobs.h new file mode 100644 index 000000000..2ab66006e --- /dev/null +++ b/src/include/distributed/pg_dist_background_jobs.h @@ -0,0 +1,20 @@ + +#ifndef CITUS_PG_DIST_BACKGROUND_JOBS_H +#define CITUS_PG_DIST_BACKGROUND_JOBS_H + +/* ---------------- + * compiler constants for pg_dist_background_jobs + * ---------------- + */ +#define Natts_pg_dist_background_jobs 6 +#define Anum_pg_dist_background_jobs_job_id 1 +#define Anum_pg_dist_background_jobs_state 2 +#define Anum_pg_dist_background_jobs_job_type 3 +#define Anum_pg_dist_background_jobs_description 4 +#define Anum_pg_dist_background_jobs_started_at 5 +#define Anum_pg_dist_background_jobs_finished_at 6 + +#define PG_DIST_BACKGROUND_JOBS_JOB_ID_SEQUENCE_NAME \ + "pg_catalog.pg_dist_background_jobs_job_id_seq" + +#endif /* CITUS_PG_DIST_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/pg_dist_background_tasks.h b/src/include/distributed/pg_dist_background_tasks.h index 2881bb38e..9250e4b66 100644 --- a/src/include/distributed/pg_dist_background_tasks.h +++ b/src/include/distributed/pg_dist_background_tasks.h @@ -6,13 +6,14 @@ * compiler constants for pg_dist_background_tasks * ---------------- */ -#define Natts_pg_dist_background_tasks 6 -#define Anum_pg_dist_background_tasks_task_id 1 -#define Anum_pg_dist_background_tasks_pid 2 -#define Anum_pg_dist_background_tasks_status 3 -#define Anum_pg_dist_background_tasks_command 4 -#define Anum_pg_dist_background_tasks_retry_count 5 -#define Anum_pg_dist_background_tasks_message 6 +#define Natts_pg_dist_background_tasks 7 +#define Anum_pg_dist_background_tasks_job_id 1 +#define Anum_pg_dist_background_tasks_task_id 2 +#define Anum_pg_dist_background_tasks_pid 3 +#define Anum_pg_dist_background_tasks_status 4 +#define Anum_pg_dist_background_tasks_command 5 +#define Anum_pg_dist_background_tasks_retry_count 6 +#define Anum_pg_dist_background_tasks_message 7 #define PG_DIST_BACKGROUND_TASK_TASK_ID_SEQUENCE_NAME \ "pg_catalog.pg_dist_background_tasks_task_id_seq" diff --git a/src/include/distributed/pg_dist_backrgound_tasks_depend.h b/src/include/distributed/pg_dist_backrgound_tasks_depend.h index 8ed7925b9..976814cd3 100644 --- a/src/include/distributed/pg_dist_backrgound_tasks_depend.h +++ b/src/include/distributed/pg_dist_backrgound_tasks_depend.h @@ -4,6 +4,7 @@ typedef struct FormData_pg_dist_background_tasks_depend { + int64 job_id; int64 task_id; int64 depends_on; #ifdef CATALOG_VARLEN /* variable-length fields start here */ @@ -16,8 +17,9 @@ typedef FormData_pg_dist_background_tasks_depend *Form_pg_dist_background_tasks_ * compiler constants for pg_dist_background_tasks_depend * ---------------- */ -#define Natts_pg_dist_background_tasks_depend 2 -#define Anum_pg_dist_background_tasks_depend_task_id 1 -#define Anum_pg_dist_background_tasks_depend_depends_on 2 +#define Natts_pg_dist_background_tasks_depend 3 +#define Anum_pg_dist_background_tasks_depend_job_id 1 +#define Anum_pg_dist_background_tasks_depend_task_id 2 +#define Anum_pg_dist_background_tasks_depend_depends_on 3 #endif /* CITUS_PG_DIST_BACKGROUND_TASKS_DEPEND_H */