From aee5dba6346ead687a4ee8d3450e91e09499f8e1 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 20 Jul 2022 18:11:29 +0200 Subject: [PATCH] add dependency tracking between rebalance jobs --- .../distributed/metadata/metadata_cache.c | 11 +++ .../distributed/metadata/metadata_utility.c | 96 ++++++++++++++----- .../distributed/operations/shard_rebalancer.c | 7 +- .../distributed/sql/citus--11.0-4--11.1-1.sql | 11 +++ src/include/distributed/metadata_cache.h | 1 + src/include/distributed/metadata_utility.h | 3 +- .../pg_dist_rebalance_jobs_depend.h | 13 +++ 7 files changed, 114 insertions(+), 28 deletions(-) create mode 100644 src/include/distributed/pg_dist_rebalance_jobs_depend.h diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 491046918..802b699d2 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -144,6 +144,7 @@ typedef struct MetadataCacheData Oid distRebalanceJobsRelationId; Oid distRebalanceJobsJobsIndexId; Oid distRebalanceJobsStatusJobsIndexId; + Oid distRebalanceJobsDependRelationId; Oid jobStatusScheduledId; Oid jobStatusRunningId; Oid jobStatusDoneId; @@ -2401,6 +2402,16 @@ DistRebalanceJobsStatusJobsIdIndexId(void) } +Oid +DistRebalanceJobsDependRelationId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs_depend", + &MetadataCache.distRebalanceJobsDependRelationId); + + return MetadataCache.distRebalanceJobsDependRelationId; +} + + /* return oid of pg_dist_rebalance_strategy relation */ Oid DistRebalanceStrategyRelationId(void) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 711567542..957303502 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -46,6 +46,7 @@ #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_rebalance_jobs.h" +#include "distributed/pg_dist_rebalance_jobs_depend.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" #include "distributed/reference_table_utils.h" @@ -2360,38 +2361,81 @@ GetNextRebalanceJobId(void) RebalanceJob * -ScheduleBackgrounRebalanceJob(char *command) +ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, + int64 dependingJobIds[]) { - Datum newValues[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool newNulls[Natts_pg_dist_rebalance_jobs] = { 0 }; - - memset(newNulls, true, sizeof(newNulls)); - - int64 jobid = GetNextRebalanceJobId(); - - newValues[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum( - GetNextRebalanceJobId()); - newNulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false; - - newValues[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId(); - newNulls[Anum_pg_dist_rebalance_jobs_status - 1] = false; - - newValues[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command); - newNulls[Anum_pg_dist_rebalance_jobs_command - 1] = false; + RebalanceJob *job = NULL; Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), RowExclusiveLock); - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), newValues, - newNulls); - CatalogTupleInsert(pgDistRebalanceJobs, newTuple); + Relation pgDistRebalanceJobsDepend = NULL; + if (dependingJobCount > 0) + { + pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(), + RowExclusiveLock); + } + + /* 1. TODO verify depending jobs exist and lock them */ + + /* 2. insert new job */ + { + Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool nulls[Natts_pg_dist_rebalance_jobs] = { 0 }; + + memset(nulls, true, sizeof(nulls)); + + int64 jobid = GetNextRebalanceJobId(); + + values[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(jobid); + nulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false; + + values[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId(); + nulls[Anum_pg_dist_rebalance_jobs_status - 1] = false; + + values[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command); + nulls[Anum_pg_dist_rebalance_jobs_command - 1] = false; + + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), + values, nulls); + CatalogTupleInsert(pgDistRebalanceJobs, newTuple); + + job = palloc0(sizeof(RebalanceJob)); + + job->jobid = jobid; + job->status = REBALANCE_JOB_STATUS_SCHEDULED; + job->command = pstrdup(command); + } + + /* 3. insert dependencies into catalog */ + { + for (int i = 0; i < dependingJobCount; i++) + { + Assert(pgDistRebalanceJobsDepend != NULL); + + Datum values[Natts_pg_dist_rebalance_jobs_depend] = { 0 }; + bool nulls[Natts_pg_dist_rebalance_jobs_depend] = { 0 }; + memset(nulls, true, sizeof(nulls)); + + values[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = + Int64GetDatum(job->jobid); + nulls[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = false; + + values[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = + Int64GetDatum(dependingJobIds[i]); + nulls[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = false; + + HeapTuple newTuple = heap_form_tuple( + RelationGetDescr(pgDistRebalanceJobsDepend), values, nulls); + CatalogTupleInsert(pgDistRebalanceJobsDepend, newTuple); + } + } + + if (pgDistRebalanceJobsDepend) + { + table_close(pgDistRebalanceJobsDepend, NoLock); + } table_close(pgDistRebalanceJobs, NoLock); - RebalanceJob *job = palloc0(sizeof(RebalanceJob)); - - job->jobid = jobid; - job->status = REBALANCE_JOB_STATUS_SCHEDULED; - job->command = pstrdup(command); - return job; } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 915056adc..0d2465a35 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1564,6 +1564,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) PlacementUpdateEvent *move = NULL; StringInfoData buf = { 0 }; initStringInfo(&buf); + bool first = true; + int64 prevJobId = 0; foreach_ptr(move, placementUpdateList) { resetStringInfo(&buf); @@ -1577,7 +1579,10 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) move->targetNode->workerPort, quote_literal_cstr(shardTranferModeLabel)); - ScheduleBackgrounRebalanceJob(buf.data); + RebalanceJob *job = ScheduleBackgrounRebalanceJob(buf.data, first ? 0 : 1, + &prevJobId); + prevJobId = job->jobid; + first = false; } /* diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 933b3523a..c171efaa7 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -93,4 +93,15 @@ ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); +CREATE TABLE citus.pg_dist_rebalance_jobs_depend( + jobid bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, + depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE, + + UNIQUE(jobid, depends_on) +); + +ALTER TABLE citus.pg_dist_rebalance_jobs_depend SET SCHEMA pg_catalog; +CREATE INDEX pg_dist_rebalance_jobs_depend_jobid ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(jobid); +CREATE INDEX pg_dist_rebalance_jobs_depend_depends_on ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(depends_on); + #include "udfs/citus_wait_for_rebalance_job/11.1-1.sql" diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 26b63bee2..4986a47ba 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -239,6 +239,7 @@ extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); extern Oid DistRebalanceJobsJobsIdIndexId(void); extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); +extern Oid DistRebalanceJobsDependRelationId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 6bd0dadf3..ebdc8b458 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -331,7 +331,8 @@ extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledRebalanceJobs(void); extern int64 GetNextRebalanceJobId(void); -extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command); +extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount, + int64 dependingJobIds[]); extern RebalanceJob * GetScheduledRebalanceJob(void); extern void ResetRunningJobs(void); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); diff --git a/src/include/distributed/pg_dist_rebalance_jobs_depend.h b/src/include/distributed/pg_dist_rebalance_jobs_depend.h new file mode 100644 index 000000000..34f94d0e8 --- /dev/null +++ b/src/include/distributed/pg_dist_rebalance_jobs_depend.h @@ -0,0 +1,13 @@ + +#ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H +#define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H + +/* ---------------- + * compiler constants for pg_dist_rebalance_jobs_depend + * ---------------- + */ +#define Natts_pg_dist_rebalance_jobs_depend 2 +#define Anum_pg_dist_rebalance_jobs_depend_jobid 1 +#define Anum_pg_dist_rebalance_jobs_depend_depends_on 2 + +#endif /* CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H */