diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index bc67912a1..a3de143ec 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -40,6 +40,7 @@ #include "distributed/shard_cleaner.h" #include "distributed/metadata_sync.h" #include "distributed/query_stats.h" +#include "distributed/repair_shards.h" #include "distributed/statistics_collection.h" #include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" @@ -54,6 +55,7 @@ #include "storage/lwlock.h" #include "tcop/tcopprot.h" #include "common/hashfn.h" +#include "utils/builtins.h" #include "utils/memutils.h" #include "utils/lsyscache.h" @@ -122,6 +124,7 @@ static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database, Oid extensionOwner); +static bool ExecuteRebalanceJob(RebalanceJob *job); /* * InitializeMaintenanceDaemon, called at server start, is responsible for @@ -850,6 +853,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("rebalance jobs worker"); + ereport(LOG, (errmsg("background jobs runner"))); + bool hasJobs = true; while (hasJobs) { @@ -872,9 +877,10 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) { ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); - /* TODO execute job */ - - UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); + if (ExecuteRebalanceJob(job)) + { + UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); + } } else { @@ -889,6 +895,40 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) } +static bool +ExecuteMoveRebalanceJob(RebalanceJob *job) +{ + DirectFunctionCall6Coll(citus_move_shard_placement, InvalidOid, + Int64GetDatum(job->jobArguments.move.shardId), + CStringGetTextDatum(job->jobArguments.move.sourceName), + Int32GetDatum(job->jobArguments.move.sourcePort), + CStringGetTextDatum(job->jobArguments.move.targetName), + Int32GetDatum(job->jobArguments.move.targetPort), + ObjectIdGetDatum(16598)); /* fix hardcoded value of 'block_writes' */ + + return true; +} + + +static bool +ExecuteRebalanceJob(RebalanceJob *job) +{ + switch (job->jobType) + { + case REBALANCE_JOB_TYPE_MOVE: + { + return ExecuteMoveRebalanceJob(job); + } + + default: + { + ereport(ERROR, (errmsg("undefined rebalance job for jobid: %ld", + job->jobid))); + } + } +} + + /* * MaintenanceDaemonShmemSize computes how much shared memory is required. */ diff --git a/src/include/distributed/repair_shards.h b/src/include/distributed/repair_shards.h index fa0d76190..af358881a 100644 --- a/src/include/distributed/repair_shards.h +++ b/src/include/distributed/repair_shards.h @@ -17,3 +17,5 @@ extern void ErrorIfMoveUnsupportedTableType(Oid relationId); extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, char *snapshotName); extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); + +extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS);