From 37d1c78e7ddf37bade937f57b7d08551737cd338 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 22 Aug 2022 10:40:05 +0200 Subject: [PATCH] Initial background job rebalance details --- .../distributed/metadata/metadata_cache.c | 11 +++++ .../distributed/operations/shard_rebalancer.c | 42 +++++++++++++++++++ .../distributed/sql/citus--11.0-4--11.1-1.sql | 13 ++++++ .../distributed/utils/background_jobs.c | 2 +- src/include/distributed/metadata_cache.h | 1 + .../pg_dist_rebalance_task_details.h | 17 ++++++++ 6 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 src/include/distributed/pg_dist_rebalance_task_details.h diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5d7f4fd98..11dd380a0 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -161,6 +161,7 @@ typedef struct MetadataCacheData Oid citusTaskStatusErrorId; Oid citusTaskStatusSnscheduledId; Oid distRebalanceStrategyRelationId; + Oid distRebalanceTaskDetailsRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; Oid distLocalGroupRelationId; @@ -2474,6 +2475,16 @@ DistRebalanceStrategyRelationId(void) } +Oid +DistRebalanceTaskDetailsRelationId(void) +{ + CachedRelationLookup("pg_dist_rebalance_task_details", + &MetadataCache.distRebalanceTaskDetailsRelationId); + + return MetadataCache.distRebalanceTaskDetailsRelationId; +} + + /* return the oid of citus namespace */ Oid CitusCatalogNamespaceId(void) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 232316e1c..a73906eb3 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -41,6 +41,7 @@ #include "distributed/multi_progress.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_rebalance_strategy.h" +#include "distributed/pg_dist_rebalance_task_details.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/repair_shards.h" @@ -1521,6 +1522,45 @@ NonColocatedDistRelationIdList(void) } +static void +CreateRebalanceTaskDetails(int64 jobId, + int64 taskId, + WorkerNode *sourceNode, + WorkerNode *targetNode, + uint64 shardId) +{ + Relation pgDistRebalanceTaskDetails = + table_open(DistRebalanceTaskDetailsRelationId(), RowExclusiveLock); + + /* insert new job */ + Datum values[Natts_pg_dist_rebalance_task_details] = { 0 }; + bool nulls[Natts_pg_dist_rebalance_task_details] = { 0 }; + + memset(nulls, true, sizeof(nulls)); + + values[Anum_pg_dist_rebalance_task_details_job_id - 1] = Int64GetDatum(jobId); + nulls[Anum_pg_dist_rebalance_task_details_job_id - 1] = false; + values[Anum_pg_dist_rebalance_task_details_task_id - 1] = Int64GetDatum(taskId); + nulls[Anum_pg_dist_rebalance_task_details_task_id - 1] = false; + values[Anum_pg_dist_rebalance_task_details_source_node - 1] = Int64GetDatum( + sourceNode->nodeId); + nulls[Anum_pg_dist_rebalance_task_details_source_node - 1] = false; + values[Anum_pg_dist_rebalance_task_details_target_node - 1] = Int64GetDatum( + targetNode->nodeId); + nulls[Anum_pg_dist_rebalance_task_details_target_node - 1] = false; + values[Anum_pg_dist_rebalance_task_details_shard_id - 1] = Int64GetDatum(shardId); + nulls[Anum_pg_dist_rebalance_task_details_shard_id - 1] = false; + values[Anum_pg_dist_rebalance_task_details_details - 1] = DirectFunctionCall1( + jsonb_in, CStringGetDatum("{}")); + nulls[Anum_pg_dist_rebalance_task_details_details - 1] = false; + HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistRebalanceTaskDetails), + values, nulls); + CatalogTupleInsert(pgDistRebalanceTaskDetails, newTuple); + + table_close(pgDistRebalanceTaskDetails, NoLock); +} + + /* * RebalanceTableShards rebalances the shards for the relations inside the * relationIdList across the different workers. @@ -1588,6 +1628,8 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid) BackgroundTask *task = ScheduleBackgroundTask(jobId, buf.data, first ? 0 : 1, &prevJobId); + CreateRebalanceTaskDetails(jobId, task->taskid, move->sourceNode, + move->targetNode, move->shardId); prevJobId = task->taskid; first = false; } diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 5a538b5a7..200b2cbd2 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -121,4 +121,17 @@ ALTER TABLE citus.pg_dist_background_tasks_depend SET SCHEMA pg_catalog; CREATE INDEX pg_dist_background_tasks_depend_task_id ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, task_id); CREATE INDEX pg_dist_background_tasks_depend_depends_on ON pg_catalog.pg_dist_background_tasks_depend USING btree(job_id, depends_on); +CREATE TABLE citus.pg_dist_rebalance_task_details( + job_id bigint NOT NULL, + task_id bigint NOT NULL, + source_node integer NOT NULL, + target_node integer NOT NULL, + shard_id bigint NOT NULL, + details jsonb NOT NULL DEFAULT '{}', + + UNIQUE(job_id, task_id), + FOREIGN KEY (job_id, task_id) REFERENCES pg_catalog.pg_dist_background_tasks (job_id, task_id) ON DELETE CASCADE +); +ALTER TABLE citus.pg_dist_rebalance_task_details SET SCHEMA pg_catalog; + #include "udfs/citus_job_wait/11.1-1.sql" diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 9774f40b2..4db368cf1 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -217,7 +217,7 @@ CitusBackgroundTaskMonitorMain(Datum arg) /* TODO find the actual database and username */ dsm_segment *seg = NULL; BackgroundWorkerHandle *handle = - StartCitusBackgroundJobExecuter("postgres", "nilsdijk", task->command, + StartCitusBackgroundJobExecuter("postgres", "jelte", task->command, &seg); if (handle == NULL) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d369eb023..eb36b7806 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -230,6 +230,7 @@ extern Oid DistNodeRelationId(void); extern Oid DistBackgroundJobsRelationId(void); extern Oid DistBackgroundTasksRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); +extern Oid DistRebalanceTaskDetailsRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); extern Oid DistEnabledCustomAggregatesId(void); diff --git a/src/include/distributed/pg_dist_rebalance_task_details.h b/src/include/distributed/pg_dist_rebalance_task_details.h new file mode 100644 index 000000000..9ec766c4c --- /dev/null +++ b/src/include/distributed/pg_dist_rebalance_task_details.h @@ -0,0 +1,17 @@ + +#ifndef CITUS_PG_DIST_REBALANCE_TASK_DETAILS_H +#define CITUS_PG_DIST_REBALANCE_TASK_DETAILS_H + +/* ---------------- + * compiler constants for pg_dist_rebalance_task_details + * ---------------- + */ +#define Natts_pg_dist_rebalance_task_details 6 +#define Anum_pg_dist_rebalance_task_details_job_id 1 +#define Anum_pg_dist_rebalance_task_details_task_id 2 +#define Anum_pg_dist_rebalance_task_details_source_node 3 +#define Anum_pg_dist_rebalance_task_details_target_node 4 +#define Anum_pg_dist_rebalance_task_details_shard_id 5 +#define Anum_pg_dist_rebalance_task_details_details 6 + +#endif /* CITUS_PG_DIST_REBALANCE_TASK_DETAILS_H */