diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index e31498fe4..44ff74145 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1991,10 +1991,11 @@ ExecuteQueryViaSPI(char *query, int SPIOK) } spiResult = SPI_execute(query, false, 0); - if (spiResult != SPIOK) - { - ereport(ERROR, (errmsg("could not run SPI query"))); - } + +/* if (spiResult != SPIOK) */ +/* { */ +/* ereport(ERROR, (errmsg("could not run SPI query"))); */ +/* } */ spiResult = SPI_finish(); if (spiResult != SPI_OK_FINISH) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 811d76c23..69a1d40db 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -146,6 +146,7 @@ typedef struct MetadataCacheData Oid distRebalanceJobsStatusJobsIndexId; Oid jobStatusScheduledId; Oid jobStatusDoneId; + Oid jobStatusErrorId; Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; @@ -3147,6 +3148,19 @@ JobStatusDoneId(void) } +Oid +JobStatusErrorId(void) +{ + if (!MetadataCache.jobStatusErrorId) + { + MetadataCache.jobStatusErrorId = + LookupStringEnumValueId("citus_job_status", "error"); + } + + return MetadataCache.jobStatusErrorId; +} + + /* * citus_dist_partition_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_partition are changed diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 00be74921..8adb70a1d 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2292,66 +2292,6 @@ RebalanceJobStatusOid(RebalanceJobStatus status) } -static void -ParseMoveJob(RebalanceJob *target, Datum moveArgs) -{ - HeapTupleHeader t = DatumGetHeapTupleHeader(moveArgs); - bool isnull = false; - - Datum shardIdDatum = GetAttributeByName(t, "shard_id", &isnull); - if (isnull) - { - ereport(ERROR, (errmsg( - "shard_id for citus_move_shard_placement_arguments " - "can't be null"))); - } - uint64 shardId = DatumGetUInt64(shardIdDatum); - - Datum sourceNodeNameDatum = GetAttributeByName(t, "source_node_name", &isnull); - if (isnull) - { - ereport(ERROR, (errmsg( - "source_node_name for citus_move_shard_placement_arguments " - "can't be null"))); - } - text *sourceNodeNameText = DatumGetTextP(sourceNodeNameDatum); - - Datum sourceNodePortDatum = GetAttributeByName(t, "source_node_port", &isnull); - if (isnull) - { - ereport(ERROR, (errmsg( - "source_node_port for citus_move_shard_placement_arguments " - "can't be null"))); - } - int32 sourceNodePort = DatumGetInt32(sourceNodePortDatum); - - Datum targetNodeNameDatum = GetAttributeByName(t, "target_node_name", &isnull); - if (isnull) - { - ereport(ERROR, (errmsg( - "target_node_name for citus_move_shard_placement_arguments " - "can't be null"))); - } - text *targetNodeNameText = DatumGetTextP(targetNodeNameDatum); - - Datum targetNodePortDatum = GetAttributeByName(t, "target_node_port", &isnull); - if (isnull) - { - ereport(ERROR, (errmsg( - "target_node_port for citus_move_shard_placement_arguments " - "can't be null"))); - } - int32 targetNodePort = DatumGetInt32(targetNodePortDatum); - - target->jobType = REBALANCE_JOB_TYPE_MOVE; - target->jobArguments.move.shardId = shardId; - target->jobArguments.move.sourceName = text_to_cstring(sourceNodeNameText); - target->jobArguments.move.sourcePort = sourceNodePort; - target->jobArguments.move.targetName = text_to_cstring(targetNodeNameText); - target->jobArguments.move.targetPort = targetNodePort; -} - - RebalanceJob * GetScheduledRebalanceJob(void) { @@ -2387,17 +2327,8 @@ GetScheduledRebalanceJob(void) TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); - if (!isNullArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]) - { - /* citus_move_shard_placement job */ - ParseMoveJob( - job, - datumArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]); - } - else - { - ereport(ERROR, (errmsg("undefined job type"))); - } + job->command = text_to_cstring( + DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1])); } systable_endscan(scanDescriptor); @@ -2452,3 +2383,114 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) systable_endscan(scanDescriptor); table_close(pgDistRebalanceJobs, NoLock); } + + +void +UpdateJobError(RebalanceJob *job, ErrorData *edata) +{ + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + /* WHERE jobid = job->jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid)); + + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsJobsIdIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " + UINT64_FORMAT, job->jobid))); + } + + Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + + heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); + + /* increment retry count */ + int retryCount = 0; + if (!isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1]) + { + retryCount = DatumGetInt32(values[Anum_pg_dist_rebalance_jobs_retry_count - 1]); + retryCount++; + } + values[Anum_pg_dist_rebalance_jobs_retry_count - 1] = Int32GetDatum(retryCount); + isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true; + + if (retryCount >= 3) + { + /* after 3 failures we will transition the job to error and stop executing */ + values[Anum_pg_dist_rebalance_jobs_status - 1] = + ObjectIdGetDatum(JobStatusErrorId()); + isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + } + + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + if (edata->message) + { + if (buf.len > 0) + { + appendStringInfo(&buf, "\n"); + } + appendStringInfoString(&buf, "ERROR: "); + appendStringInfoString(&buf, edata->message); + } + + if (edata->hint) + { + if (buf.len > 0) + { + appendStringInfo(&buf, "\n"); + } + appendStringInfoString(&buf, "HINT: "); + appendStringInfoString(&buf, edata->hint); + } + + if (edata->detail) + { + if (buf.len > 0) + { + appendStringInfo(&buf, "\n"); + } + appendStringInfoString(&buf, "DETAIL: "); + appendStringInfoString(&buf, edata->detail); + } + + if (edata->context) + { + if (buf.len > 0) + { + appendStringInfo(&buf, "\n"); + } + appendStringInfoString(&buf, "CONTEXT: "); + appendStringInfoString(&buf, edata->context); + } + + values[Anum_pg_dist_rebalance_jobs_message - 1] = CStringGetTextDatum(buf.data); + isnull[Anum_pg_dist_rebalance_jobs_message - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_message - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobs, NoLock); +} diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 75271e314..f8a0cae42 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -77,25 +77,17 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" -CREATE TYPE citus.citus_move_shard_placement_arguments AS ( - shard_id bigint, - source_node_name text, - source_node_port integer, - target_node_name text, - target_node_port integer, - shard_transfer_mode citus.shard_transfer_mode -); -ALTER TYPE citus.citus_move_shard_placement_arguments SET SCHEMA pg_catalog; - -CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done'); +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done', 'error'); ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; CREATE TABLE citus.pg_dist_rebalance_jobs( jobid bigserial NOT NULL, - status pg_catalog.citus_job_status default 'scheduled', - citus_move_shard_placement pg_catalog.citus_move_shard_placement_arguments + status pg_catalog.citus_job_status default 'scheduled' NOT NULL, + command text NOT NULL, + retry_count integer, + message text ); --- SELECT granted to PUBLIC in upgrade script + ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index a3de143ec..bc0c71295 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -19,6 +19,7 @@ #include "distributed/pg_version_constants.h" #include +#include #include "miscadmin.h" #include "pgstat.h" @@ -855,6 +856,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) ereport(LOG, (errmsg("background jobs runner"))); +/* pg_usleep(30 * 1000 * 1000); */ + bool hasJobs = true; while (hasJobs) { @@ -876,11 +879,35 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) if (job) { ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); + MemoryContext savedContext = CurrentMemoryContext; + BeginInternalSubTransaction(NULL); - if (ExecuteRebalanceJob(job)) + PG_TRY(); { - UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); + if (ExecuteRebalanceJob(job)) + { + UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); + } + + ReleaseCurrentSubTransaction(); } + PG_CATCH(); + { + MemoryContextSwitchTo(savedContext); + + ErrorData *edata = CopyErrorData(); + FlushErrorState(); + + RollbackAndReleaseCurrentSubTransaction(); + + UpdateJobError(job, edata); + + FreeErrorData(edata); + edata = NULL; + + /* TODO log that there was an error */ + } + PG_END_TRY(); } else { @@ -895,37 +922,29 @@ RebalanceJobsBackgroundWorkerMain(Datum arg) } -static bool -ExecuteMoveRebalanceJob(RebalanceJob *job) -{ - DirectFunctionCall6Coll(citus_move_shard_placement, InvalidOid, - Int64GetDatum(job->jobArguments.move.shardId), - CStringGetTextDatum(job->jobArguments.move.sourceName), - Int32GetDatum(job->jobArguments.move.sourcePort), - CStringGetTextDatum(job->jobArguments.move.targetName), - Int32GetDatum(job->jobArguments.move.targetPort), - ObjectIdGetDatum(16598)); /* fix hardcoded value of 'block_writes' */ - - return true; -} - - static bool ExecuteRebalanceJob(RebalanceJob *job) { - switch (job->jobType) + int spiResult = SPI_connect(); + if (spiResult != SPI_OK_CONNECT) { - case REBALANCE_JOB_TYPE_MOVE: - { - return ExecuteMoveRebalanceJob(job); - } - - default: - { - ereport(ERROR, (errmsg("undefined rebalance job for jobid: %ld", - job->jobid))); - } + ereport(ERROR, (errmsg("could not connect to SPI manager"))); } + + spiResult = SPI_execute(job->command, false, 0); + +/* if (spiResult != SPIOK) */ +/* { */ +/* ereport(ERROR, (errmsg("could not run SPI query"))); */ +/* } */ + + spiResult = SPI_finish(); + if (spiResult != SPI_OK_FINISH) + { + ereport(ERROR, (errmsg("could not finish SPI connection"))); + } + + return true; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 06b664470..7a3957dcf 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -271,8 +271,9 @@ extern Oid SecondaryNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); -extern Oid JobStatusDoneId(void); extern Oid JobStatusScheduledId(void); +extern Oid JobStatusDoneId(void); +extern Oid JobStatusErrorId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 87a8e9931..109234e83 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -211,29 +211,12 @@ typedef enum RebalanceJobStatus REBALANCE_JOB_STATUS_DONE } RebalanceJobStatus; -typedef enum RebalanceJobType -{ - REBALANCE_JOB_TYPE_UNKNOWN, - REBALANCE_JOB_TYPE_MOVE -} RebalanceJobType; typedef struct RebalanceJob { int64 jobid; RebalanceJobStatus status; - - RebalanceJobType jobType; - union - { - struct - { - uint32 shardId; - char *sourceName; - int32 sourcePort; - char *targetName; - int32 targetPort; - } move; - } jobArguments; + char *command; } RebalanceJob; @@ -347,4 +330,5 @@ extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern bool HasScheduledRebalanceJobs(void); extern RebalanceJob * GetScheduledRebalanceJob(void); extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); +extern void UpdateJobError(RebalanceJob *job, ErrorData *edata); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs.h b/src/include/distributed/pg_dist_rebalance_jobs.h index 1073b6dc2..cddd08fa9 100644 --- a/src/include/distributed/pg_dist_rebalance_jobs.h +++ b/src/include/distributed/pg_dist_rebalance_jobs.h @@ -11,7 +11,9 @@ typedef struct FormData_pg_dist_rebalance_job int64 jobid; Oid status; #ifdef CATALOG_VARLEN /* variable-length fields start here */ - text citus_move_shard_placement; /* text? we need to understand how to read a variable length stored custon type */ + text command; + int32 retry_count; + text message; #endif } FormData_pg_dist_rebalance_job; @@ -26,9 +28,11 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job; * compiler constants for pg_dist_rebalance_jobs * ---------------- */ -#define Natts_pg_dist_rebalance_jobs 3 +#define Natts_pg_dist_rebalance_jobs 5 #define Anum_pg_dist_rebalance_jobs_jobid 1 #define Anum_pg_dist_rebalance_jobs_status 2 -#define Anum_pg_dist_rebalance_jobs_citus_move_shard_placement 3 +#define Anum_pg_dist_rebalance_jobs_command 3 +#define Anum_pg_dist_rebalance_jobs_retry_count 4 +#define Anum_pg_dist_rebalance_jobs_message 5 #endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */