add dependency tracking between rebalance jobs

background-job-details
Nils Dijk 2022-07-20 18:11:29 +02:00 committed by Jelte Fennema
parent 6fa08619cf
commit aee5dba634
7 changed files with 114 additions and 28 deletions

View File

@ -144,6 +144,7 @@ typedef struct MetadataCacheData
Oid distRebalanceJobsRelationId;
Oid distRebalanceJobsJobsIndexId;
Oid distRebalanceJobsStatusJobsIndexId;
Oid distRebalanceJobsDependRelationId;
Oid jobStatusScheduledId;
Oid jobStatusRunningId;
Oid jobStatusDoneId;
@ -2401,6 +2402,16 @@ DistRebalanceJobsStatusJobsIdIndexId(void)
}
Oid
DistRebalanceJobsDependRelationId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs_depend",
&MetadataCache.distRebalanceJobsDependRelationId);
return MetadataCache.distRebalanceJobsDependRelationId;
}
/* return oid of pg_dist_rebalance_strategy relation */
Oid
DistRebalanceStrategyRelationId(void)

View File

@ -46,6 +46,7 @@
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_rebalance_jobs.h"
#include "distributed/pg_dist_rebalance_jobs_depend.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h"
@ -2360,38 +2361,81 @@ GetNextRebalanceJobId(void)
RebalanceJob *
ScheduleBackgrounRebalanceJob(char *command)
ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[])
{
Datum newValues[Natts_pg_dist_rebalance_jobs] = { 0 };
bool newNulls[Natts_pg_dist_rebalance_jobs] = { 0 };
memset(newNulls, true, sizeof(newNulls));
int64 jobid = GetNextRebalanceJobId();
newValues[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(
GetNextRebalanceJobId());
newNulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false;
newValues[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId();
newNulls[Anum_pg_dist_rebalance_jobs_status - 1] = false;
newValues[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command);
newNulls[Anum_pg_dist_rebalance_jobs_command - 1] = false;
RebalanceJob *job = NULL;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
RowExclusiveLock);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), newValues,
newNulls);
CatalogTupleInsert(pgDistRebalanceJobs, newTuple);
Relation pgDistRebalanceJobsDepend = NULL;
if (dependingJobCount > 0)
{
pgDistRebalanceJobsDepend = table_open(DistRebalanceJobsDependRelationId(),
RowExclusiveLock);
}
/* 1. TODO verify depending jobs exist and lock them */
/* 2. insert new job */
{
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
bool nulls[Natts_pg_dist_rebalance_jobs] = { 0 };
memset(nulls, true, sizeof(nulls));
int64 jobid = GetNextRebalanceJobId();
values[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(jobid);
nulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false;
values[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId();
nulls[Anum_pg_dist_rebalance_jobs_status - 1] = false;
values[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command);
nulls[Anum_pg_dist_rebalance_jobs_command - 1] = false;
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs),
values, nulls);
CatalogTupleInsert(pgDistRebalanceJobs, newTuple);
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid;
job->status = REBALANCE_JOB_STATUS_SCHEDULED;
job->command = pstrdup(command);
}
/* 3. insert dependencies into catalog */
{
for (int i = 0; i < dependingJobCount; i++)
{
Assert(pgDistRebalanceJobsDepend != NULL);
Datum values[Natts_pg_dist_rebalance_jobs_depend] = { 0 };
bool nulls[Natts_pg_dist_rebalance_jobs_depend] = { 0 };
memset(nulls, true, sizeof(nulls));
values[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] =
Int64GetDatum(job->jobid);
nulls[Anum_pg_dist_rebalance_jobs_depend_jobid - 1] = false;
values[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] =
Int64GetDatum(dependingJobIds[i]);
nulls[Anum_pg_dist_rebalance_jobs_depend_depends_on - 1] = false;
HeapTuple newTuple = heap_form_tuple(
RelationGetDescr(pgDistRebalanceJobsDepend), values, nulls);
CatalogTupleInsert(pgDistRebalanceJobsDepend, newTuple);
}
}
if (pgDistRebalanceJobsDepend)
{
table_close(pgDistRebalanceJobsDepend, NoLock);
}
table_close(pgDistRebalanceJobs, NoLock);
RebalanceJob *job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid;
job->status = REBALANCE_JOB_STATUS_SCHEDULED;
job->command = pstrdup(command);
return job;
}

View File

@ -1564,6 +1564,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
PlacementUpdateEvent *move = NULL;
StringInfoData buf = { 0 };
initStringInfo(&buf);
bool first = true;
int64 prevJobId = 0;
foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
@ -1577,7 +1579,10 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
move->targetNode->workerPort,
quote_literal_cstr(shardTranferModeLabel));
ScheduleBackgrounRebalanceJob(buf.data);
RebalanceJob *job = ScheduleBackgrounRebalanceJob(buf.data, first ? 0 : 1,
&prevJobId);
prevJobId = job->jobid;
first = false;
}
/*

View File

@ -93,4 +93,15 @@ ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid);
CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid);
CREATE TABLE citus.pg_dist_rebalance_jobs_depend(
jobid bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE,
depends_on bigint NOT NULL REFERENCES pg_catalog.pg_dist_rebalance_jobs(jobid) ON DELETE CASCADE,
UNIQUE(jobid, depends_on)
);
ALTER TABLE citus.pg_dist_rebalance_jobs_depend SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_rebalance_jobs_depend_jobid ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(jobid);
CREATE INDEX pg_dist_rebalance_jobs_depend_depends_on ON pg_catalog.pg_dist_rebalance_jobs_depend USING btree(depends_on);
#include "udfs/citus_wait_for_rebalance_job/11.1-1.sql"

View File

@ -239,6 +239,7 @@ extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistRebalanceJobsJobsIdIndexId(void);
extern Oid DistRebalanceJobsStatusJobsIdIndexId(void);
extern Oid DistRebalanceJobsDependRelationId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistPlacementShardidIndexId(void);

View File

@ -331,7 +331,8 @@ extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command, int dependingJobCount,
int64 dependingJobIds[]);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern void ResetRunningJobs(void);
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);

View File

@ -0,0 +1,13 @@
#ifndef CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
#define CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H
/* ----------------
* compiler constants for pg_dist_rebalance_jobs_depend
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs_depend 2
#define Anum_pg_dist_rebalance_jobs_depend_jobid 1
#define Anum_pg_dist_rebalance_jobs_depend_depends_on 2
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_DEPEND_H */