let rebalancer schedule moves in background

background-job-details
Nils Dijk 2022-07-18 15:25:37 +02:00 committed by Jelte Fennema
parent 5e7a4edd64
commit 3cf14ee816
4 changed files with 92 additions and 2 deletions

View File

@ -2315,6 +2315,66 @@ RebalanceJobStatusOid(RebalanceJobStatus status)
}
int64
GetNextRebalanceJobId(void)
{
text *sequenceName = cstring_to_text(REBALANCE_JOB_JOBID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
uint64 jobId = DatumGetInt64(jobIdOid);
return jobId;
}
RebalanceJob *
ScheduleBackgrounRebalanceJob(char *command)
{
Datum newValues[Natts_pg_dist_rebalance_jobs] = { 0 };
bool newNulls[Natts_pg_dist_rebalance_jobs] = { 0 };
memset(newNulls, true, sizeof(newNulls));
int64 jobid = GetNextRebalanceJobId();
newValues[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum(
GetNextRebalanceJobId());
newNulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false;
newValues[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId();
newNulls[Anum_pg_dist_rebalance_jobs_status - 1] = false;
newValues[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command);
newNulls[Anum_pg_dist_rebalance_jobs_command - 1] = false;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
RowExclusiveLock);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), newValues,
newNulls);
CatalogTupleInsert(pgDistRebalanceJobs, newTuple);
table_close(pgDistRebalanceJobs, NoLock);
RebalanceJob *job = palloc0(sizeof(RebalanceJob));
job->jobid = jobid;
job->status = REBALANCE_JOB_STATUS_SCHEDULED;
job->command = pstrdup(command);
return job;
}
RebalanceJob *
GetScheduledRebalanceJob(void)
{

View File

@ -1555,12 +1555,38 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
return;
}
/* find the name of the shard transfer mode to interpolate in the scheduled command */
Datum shardTranferModeLabelDatum =
DirectFunctionCall1(enum_out, shardReplicationModeOid);
char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum);
/* schedule planned moves */
PlacementUpdateEvent *move = NULL;
StringInfoData buf = { 0 };
initStringInfo(&buf);
foreach_ptr(move, placementUpdateList)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"SELECT citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)",
move->shardId,
quote_literal_cstr(move->sourceNode->workerName),
move->sourceNode->workerPort,
quote_literal_cstr(move->targetNode->workerName),
move->targetNode->workerPort,
quote_literal_cstr(shardTranferModeLabel));
ScheduleBackgrounRebalanceJob(buf.data);
}
/*
* This uses the first relationId from the list, it's only used for display
* purposes so it does not really matter which to show
*/
ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving");
FinalizeCurrentProgressMonitor();
/* ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); */
/* FinalizeCurrentProgressMonitor(); */
}

View File

@ -329,6 +329,8 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern int64 GetNextRebalanceJobId(void);
extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);

View File

@ -35,4 +35,6 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job;
#define Anum_pg_dist_rebalance_jobs_retry_count 4
#define Anum_pg_dist_rebalance_jobs_message 5
#define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq"
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */