From 3cf14ee816f08aa008ce8810f4adb42545a0dd0d Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 18 Jul 2022 15:25:37 +0200 Subject: [PATCH] let rebalancer schedule moves in background --- .../distributed/metadata/metadata_utility.c | 60 +++++++++++++++++++ .../distributed/operations/shard_rebalancer.c | 30 +++++++++- src/include/distributed/metadata_utility.h | 2 + .../distributed/pg_dist_rebalance_jobs.h | 2 + 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 12144b152..19f89a8a4 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2315,6 +2315,66 @@ RebalanceJobStatusOid(RebalanceJobStatus status) } +int64 +GetNextRebalanceJobId(void) +{ + text *sequenceName = cstring_to_text(REBALANCE_JOB_JOBID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName, false); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + Datum jobIdOid = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + uint64 jobId = DatumGetInt64(jobIdOid); + + return jobId; +} + + +RebalanceJob * +ScheduleBackgrounRebalanceJob(char *command) +{ + Datum newValues[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool newNulls[Natts_pg_dist_rebalance_jobs] = { 0 }; + + memset(newNulls, true, sizeof(newNulls)); + + int64 jobid = GetNextRebalanceJobId(); + + newValues[Anum_pg_dist_rebalance_jobs_jobid - 1] = Int64GetDatum( + GetNextRebalanceJobId()); + newNulls[Anum_pg_dist_rebalance_jobs_jobid - 1] = false; + + newValues[Anum_pg_dist_rebalance_jobs_status - 1] = JobStatusScheduledId(); + newNulls[Anum_pg_dist_rebalance_jobs_status - 1] = false; + + newValues[Anum_pg_dist_rebalance_jobs_command - 1] = CStringGetTextDatum(command); + newNulls[Anum_pg_dist_rebalance_jobs_command - 1] = false; + + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + RowExclusiveLock); + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceJobs), newValues, + newNulls); + CatalogTupleInsert(pgDistRebalanceJobs, newTuple); + table_close(pgDistRebalanceJobs, NoLock); + + RebalanceJob *job = palloc0(sizeof(RebalanceJob)); + + job->jobid = jobid; + job->status = REBALANCE_JOB_STATUS_SCHEDULED; + job->command = pstrdup(command); + + return job; +} + + RebalanceJob * GetScheduledRebalanceJob(void) { diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index ad7cccb0e..915056adc 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1555,12 +1555,38 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) return; } + /* find the name of the shard transfer mode to interpolate in the scheduled command */ + Datum shardTranferModeLabelDatum = + DirectFunctionCall1(enum_out, shardReplicationModeOid); + char *shardTranferModeLabel = DatumGetCString(shardTranferModeLabelDatum); + + /* schedule planned moves */ + PlacementUpdateEvent *move = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + foreach_ptr(move, placementUpdateList) + { + resetStringInfo(&buf); + + appendStringInfo(&buf, + "SELECT citus_move_shard_placement(%ld,%s,%u,%s,%u,%s)", + move->shardId, + quote_literal_cstr(move->sourceNode->workerName), + move->sourceNode->workerPort, + quote_literal_cstr(move->targetNode->workerName), + move->targetNode->workerPort, + quote_literal_cstr(shardTranferModeLabel)); + + ScheduleBackgrounRebalanceJob(buf.data); + } + /* * This uses the first relationId from the list, it's only used for display * purposes so it does not really matter which to show */ - ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); - FinalizeCurrentProgressMonitor(); + + /* ExecutePlacementUpdates(placementUpdateList, shardReplicationModeOid, "Moving"); */ + /* FinalizeCurrentProgressMonitor(); */ } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 01197318a..5c11bbabe 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -329,6 +329,8 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledRebalanceJobs(void); +extern int64 GetNextRebalanceJobId(void); +extern RebalanceJob * ScheduleBackgrounRebalanceJob(char *command); extern RebalanceJob * GetScheduledRebalanceJob(void); extern RebalanceJob * GetScheduledRebalanceJobyJobID(int64 jobId); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); diff --git a/src/include/distributed/pg_dist_rebalance_jobs.h b/src/include/distributed/pg_dist_rebalance_jobs.h index cddd08fa9..a968d1b89 100644 --- a/src/include/distributed/pg_dist_rebalance_jobs.h +++ b/src/include/distributed/pg_dist_rebalance_jobs.h @@ -35,4 +35,6 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job; #define Anum_pg_dist_rebalance_jobs_retry_count 4 #define Anum_pg_dist_rebalance_jobs_message 5 +#define REBALANCE_JOB_JOBID_SEQUENCE_NAME "pg_catalog.pg_dist_rebalance_jobs_jobid_seq" + #endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */