From ec292cde48d9236142cb8f0087080a871c42ce32 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 2 Aug 2022 15:37:27 +0200 Subject: [PATCH] WIP refactor background job execution --- .../distributed/metadata/metadata_utility.c | 165 +----- src/backend/distributed/shared_library_init.c | 1 + .../distributed/utils/background_jobs.c | 537 ++++++++++++++++++ src/backend/distributed/utils/maintenanced.c | 206 +------ src/include/distributed/background_jobs.h | 15 + src/include/distributed/maintenanced.h | 1 - src/include/distributed/metadata_utility.h | 3 +- src/include/distributed/shard_cleaner.h | 1 - 8 files changed, 589 insertions(+), 340 deletions(-) create mode 100644 src/backend/distributed/utils/background_jobs.c create mode 100644 src/include/distributed/background_jobs.h diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index a5959ad30..a7ba887a5 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -2667,7 +2667,8 @@ GetScheduledRebalanceJobByJobID(int64 jobId) void -UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) +UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry_count, + char *message) { Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), RowExclusiveLock); @@ -2678,7 +2679,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) /* WHERE jobid = job->jobid */ ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid)); + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid)); const bool indexOK = true; SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, @@ -2690,69 +2691,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) if (!HeapTupleIsValid(heapTuple)) { ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " - UINT64_FORMAT, job->jobid))); - } - - Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; - bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; - - values[Anum_pg_dist_rebalance_jobs_status - 1] = - ObjectIdGetDatum(RebalanceJobStatusOid(newStatus)); - isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; - - /* TODO figure out a nice way on how to update a tuple selectively */ - if (newStatus == REBALANCE_JOB_STATUS_RUNNING) - { - /* update pid for running status */ - values[Anum_pg_dist_rebalance_jobs_pid - 1] = Int32GetDatum((int32) MyProcPid); - isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - } - else - { - values[Anum_pg_dist_rebalance_jobs_pid - 1] = 0; - isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - } - - heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); - - CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); - - CommandCounterIncrement(); - - systable_endscan(scanDescriptor); - table_close(pgDistRebalanceJobs, NoLock); -} - - -bool -UpdateJobError(RebalanceJob *job, ErrorData *edata) -{ - Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), - RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); - - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - - /* WHERE jobid = job->jobid */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid)); - - const bool indexOK = true; - SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, - DistRebalanceJobsJobsIdIndexId(), - indexOK, - NULL, scanKeyCount, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " - UINT64_FORMAT, job->jobid))); + UINT64_FORMAT, jobid))); } Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; @@ -2761,79 +2700,45 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata) heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); - /* increment retry count */ - int retryCount = 0; - if (!isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1]) +#define UPDATE_FIELD(field, newNull, newValue) \ + replace[(field - 1)] = ((newNull != isnull[(field - 1)]) || (values[(field - 1)] != \ + newValue)); \ + isnull[(field - 1)] = (newNull); \ + values[(field - 1)] = (newValue); + + if (pid) { - retryCount = DatumGetInt32(values[Anum_pg_dist_rebalance_jobs_retry_count - 1]); - retryCount++; + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, false, Int32GetDatum(*pid)); } - values[Anum_pg_dist_rebalance_jobs_retry_count - 1] = Int32GetDatum(retryCount); - isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true; - - values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid; - isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true; - - bool statusError = false; - if (retryCount >= 3) + else { - /* after 3 failures we will transition the job to error and stop executing */ - values[Anum_pg_dist_rebalance_jobs_status - 1] = - ObjectIdGetDatum(JobStatusErrorId()); - isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; - - statusError = true; + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, true, InvalidOid); } - StringInfoData buf = { 0 }; - initStringInfo(&buf); + Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status)); + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_status, false, statusOid); - if (edata->message) + if (retry_count) { - if (buf.len > 0) - { - appendStringInfo(&buf, "\n"); - } - appendStringInfoString(&buf, "ERROR: "); - appendStringInfoString(&buf, edata->message); + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, false, Int32GetDatum( + *retry_count)); + } + else + { + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, true, InvalidOid); } - if (edata->hint) + if (message) { - if (buf.len > 0) - { - appendStringInfo(&buf, "\n"); - } - appendStringInfoString(&buf, "HINT: "); - appendStringInfoString(&buf, edata->hint); + Oid messageOid = CStringGetTextDatum(message); + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, false, messageOid); + } + else + { + UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, true, InvalidOid); } - if (edata->detail) - { - if (buf.len > 0) - { - appendStringInfo(&buf, "\n"); - } - appendStringInfoString(&buf, "DETAIL: "); - appendStringInfoString(&buf, edata->detail); - } - - if (edata->context) - { - if (buf.len > 0) - { - appendStringInfo(&buf, "\n"); - } - appendStringInfoString(&buf, "CONTEXT: "); - appendStringInfoString(&buf, edata->context); - } - - values[Anum_pg_dist_rebalance_jobs_message - 1] = CStringGetTextDatum(buf.data); - isnull[Anum_pg_dist_rebalance_jobs_message - 1] = false; - replace[Anum_pg_dist_rebalance_jobs_message - 1] = true; +#undef UPDATE_FIELD heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2843,14 +2748,6 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata) systable_endscan(scanDescriptor); table_close(pgDistRebalanceJobs, NoLock); - - /* when we have changed the status to Error we will need to unschedule all dependent jobs (recursively) */ - if (statusError) - { - UnscheduleDependantJobs(job->jobid); - } - - return statusError; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 4bd64d51a..7129b10dc 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -32,6 +32,7 @@ #include "common/string.h" #include "executor/executor.h" #include "distributed/backend_data.h" +#include "distributed/background_jobs.h" #include "distributed/citus_depended_object.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_safe_lib.h" diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c new file mode 100644 index 000000000..e37713c87 --- /dev/null +++ b/src/backend/distributed/utils/background_jobs.c @@ -0,0 +1,537 @@ + +#include "postgres.h" + +#include "safe_mem_lib.h" + +#include "libpq/pqmq.h" +#include "parser/analyze.h" +#include "pgstat.h" +#include "storage/dsm.h" +#include "storage/ipc.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "tcop/utility.h" +#include "utils/backend_status.h" +#include "utils/memutils.h" +#include "utils/portal.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" +#include "utils/timeout.h" + +#include "distributed/background_jobs.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/listutils.h" +#include "distributed/maintenanced.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" +#include "distributed/shard_cleaner.h" + +bool RebalanceJobDebugDelay = false; + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define CITUS_BACKGROUND_JOB_MAGIC 0x51028081 +#define CITUS_BACKGROUND_JOB_KEY_DATABASE 0 +#define CITUS_BACKGROUND_JOB_KEY_USERNAME 1 +#define CITUS_BACKGROUND_JOB_KEY_COMMAND 2 +#define CITUS_BACKGROUND_JOB_KEY_QUEUE 3 +#define CITUS_BACKGROUND_JOB_NKEYS 4 + +static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database, + char *user, + char *command); +static void ExecuteSqlString(const char *sql); + +BackgroundWorkerHandle * +StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner) +{ + BackgroundWorker worker = { 0 }; + BackgroundWorkerHandle *handle = NULL; + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Rebalance Jobs Worker: %u/%u", + database, extensionOwner); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusBackgroundJobMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +void +CitusBackgroundJobMain(Datum arg) +{ + Oid databaseOid = DatumGetObjectId(arg); + + /* extension owner is passed via bgw_extra */ + Oid extensionOwner = InvalidOid; + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + BackgroundWorkerUnblockSignals(); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname("rebalance jobs worker"); + + ereport(LOG, (errmsg("background jobs runner"))); + + if (RebalanceJobDebugDelay) + { + pg_usleep(30 * 1000 * 1000); + } + + MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, + "PerJobContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * First we find all jobs that are running, we need to check if they are still running + * if not reset their state back to scheduled. + */ + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* TODO have an actual function to check if the worker is still running */ + ResetRunningJobs(); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + + MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); + bool hasJobs = true; + while (hasJobs) + { + MemoryContextReset(perJobContext); + + CHECK_FOR_INTERRUPTS(); + + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * We need to load the job into the perJobContext as we will switch contexts + * later due to the committing and starting of new transactions + */ + MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); + RebalanceJob *job = GetRunableRebalanceJob(); + MemoryContextSwitchTo(oldContext); + + if (!job) + { + PopActiveSnapshot(); + CommitTransactionCommand(); + + hasJobs = false; + break; + } + + PopActiveSnapshot(); + CommitTransactionCommand(); + + MemoryContextSwitchTo(perJobContext); + + /* TODO find the actual database and username */ + BackgroundWorkerHandle *handle = + StartCitusBackgroundJobExecuter("postgres", "nilsdijk", job->command); + + if (handle == NULL) + { + /* TODO something better here */ + ereport(ERROR, (errmsg("unable to start background worker"))); + } + + pid_t pid = 0; + GetBackgroundWorkerPid(handle, &pid); + + ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Update job status to indicate it is running */ + UpdateJobStatus(job->jobid, &pid, REBALANCE_JOB_STATUS_RUNNING, NULL, NULL); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + MemoryContextSwitchTo(perJobContext); + + /* TODO keep polling the job */ + while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED) + { + int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + int rc = WaitLatch(MyLatch, latchFlags, (long) 1000, PG_WAIT_EXTENSION); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* TODO job can actually also have failed*/ + UpdateJobStatus(job->jobid, NULL, REBALANCE_JOB_STATUS_DONE, NULL, NULL); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + MemoryContextSwitchTo(oldContextPerJob); + MemoryContextDelete(perJobContext); +} + + +static dsm_segment * +StoreArgumentsInDSM(char *database, char *username, char *command) +{ + /* + * Create the shared memory that we will pass to the background + * worker process. We use DSM_CREATE_NULL_IF_MAXSEGMENTS so that we + * do not ERROR here. This way, we can mark the job as failed and + * keep the launcher process running normally. + */ + shm_toc_estimator e = { 0 }; + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, strlen(database) + 1); + shm_toc_estimate_chunk(&e, strlen(username) + 1); + shm_toc_estimate_chunk(&e, strlen(command) + 1); +#define QUEUE_SIZE ((Size) 65536) + shm_toc_estimate_chunk(&e, QUEUE_SIZE); + shm_toc_estimate_keys(&e, CITUS_BACKGROUND_JOB_NKEYS); + Size segsize = shm_toc_estimate(&e); + + dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + { + ereport(ERROR, + (errmsg("max number of DSM segments may has been reached"))); + + return NULL; + } + + shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg), + segsize); + + Size size = strlen(database) + 1; + char *databaseTarget = shm_toc_allocate(toc, size); + strcpy_s(databaseTarget, size, database); + shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, databaseTarget); + + size = strlen(username) + 1; + char *usernameTarget = shm_toc_allocate(toc, size); + strcpy_s(usernameTarget, size, username); + shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, usernameTarget); + + size = strlen(command) + 1; + char *commandTarget = shm_toc_allocate(toc, size); + strcpy_s(commandTarget, size, command); + shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, commandTarget); + + shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE); + shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, mq); + shm_mq_set_receiver(mq, MyProc); + + /* + * Attach the queue before launching a worker, so that we'll automatically + * detach the queue if we error out. (Otherwise, the worker might sit + * there trying to write the queue long after we've gone away.) + */ + MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext); + shm_mq_attach(mq, seg, NULL); + MemoryContextSwitchTo(oldcontext); + + return seg; +} + + +static BackgroundWorkerHandle * +StartCitusBackgroundJobExecuter(char *database, char *username, char *command) +{ + dsm_segment *seg = StoreArgumentsInDSM(database, username, command); + + /* Configure a worker. */ + BackgroundWorker worker = { 0 }; + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, "Citus Background Job Executor: %s/%s", + database, username); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusBackgroundJobExecuter"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + BackgroundWorkerHandle *handle = NULL; + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + dsm_detach(seg); + return NULL; + } + + pid_t pid = { 0 }; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +/* + * Background worker logic. + * + * based on the background worker logic in pgcron + */ +void +CitusBackgroundJobExecuter(Datum main_arg) +{ + /* + * TODO figure out if we need this signal handler that is in pgcron + * pqsignal(SIGTERM, pg_cron_background_worker_sigterm); + */ + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "citus background job"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "citus background job execution", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Set up a dynamic shared memory segment. */ + dsm_segment *seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + } + + shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + } + + char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, false); + char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, false); + char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, false); + shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, false); + + shm_mq_set_sender(mq, MyProc); + shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(seg, responseq); + + BackgroundWorkerInitializeConnection(database, username, 0); + + /* Prepare to execute the query. */ + SetCurrentStatementStartTimestamp(); + debug_query_string = command; + pgstat_report_activity(STATE_RUNNING, command); + StartTransactionCommand(); + if (StatementTimeout > 0) + { + enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout); + } + else + { + disable_timeout(STATEMENT_TIMEOUT, false); + } + + /* Execute the query. */ + ExecuteSqlString(command); + + /* Post-execution cleanup. */ + disable_timeout(STATEMENT_TIMEOUT, false); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, command); + pgstat_report_stat(true); + + /* Signal that we are done. */ + ReadyForQuery(DestRemote); + + dsm_detach(seg); + proc_exit(0); +} + + +/* + * Execute given SQL string without SPI or a libpq session. + */ +static void +ExecuteSqlString(const char *sql) +{ + /* + * Parse the SQL string into a list of raw parse trees. + * + * Because we allow statements that perform internal transaction control, + * we can't do this in TopTransactionContext; the parse trees might get + * blown away before we're done executing them. + */ + MemoryContext parsecontext = AllocSetContextCreate(CurrentMemoryContext, + "query parse/plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldcontext = MemoryContextSwitchTo(parsecontext); + List *raw_parsetree_list = pg_parse_query(sql); + int commands_remaining = list_length(raw_parsetree_list); + bool isTopLevel = commands_remaining == 1; + MemoryContextSwitchTo(oldcontext); + + /* + * Do parse analysis, rule rewrite, planning, and execution for each raw + * parsetree. We must fully execute each query before beginning parse + * analysis on the next one, since there may be interdependencies. + */ + RawStmt *parsetree = NULL; + foreach_ptr(parsetree, raw_parsetree_list) + { + /* + * We don't allow transaction-control commands like COMMIT and ABORT + * here. The entire SQL statement is executed as a single transaction + * which commits if no errors are encountered. + */ + if (IsA(parsetree, TransactionStmt)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "transaction control statements are not allowed in background job"))); + } + + /* + * Get the command name for use in status display (it also becomes the + * default completion tag, down inside PortalRun). Set ps_status and + * do any special start-of-SQL-command processing needed by the + * destination. + */ + CommandTag commandTag = CreateCommandTag(parsetree->stmt); + set_ps_display(GetCommandTagName(commandTag)); + BeginCommand(commandTag, DestNone); + + /* Set up a snapshot if parse analysis/planning will need one. */ + bool snapshot_set = false; + if (analyze_requires_snapshot(parsetree)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + /* + * OK to analyze, rewrite, and plan this query. + * + * As with parsing, we need to make sure this data outlives the + * transaction, because of the possibility that the statement might + * perform internal transaction control. + */ + oldcontext = MemoryContextSwitchTo(parsecontext); + +#if PG_VERSION_NUM >= 150000 + List *querytree_list = + pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL); +#else + List *querytree_list = + pg_analyze_and_rewrite(parsetree, sql, NULL, 0, NULL); +#endif + + List *plantree_list = pg_plan_queries(querytree_list, sql, 0, NULL); + + /* Done with the snapshot used for parsing/planning */ + if (snapshot_set) + { + PopActiveSnapshot(); + } + + /* If we got a cancel signal in analysis or planning, quit */ + CHECK_FOR_INTERRUPTS(); + + /* + * Execute the query using the unnamed portal. + */ + Portal portal = CreatePortal("", true, true); + + /* Don't display the portal in pg_cursors */ + portal->visible = false; + PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL); + PortalStart(portal, NULL, 0, InvalidSnapshot); + int16 format[] = { 1 }; + PortalSetResultFormat(portal, lengthof(format), format); /* binary format */ + + commands_remaining--; + DestReceiver *receiver = CreateDestReceiver(DestNone); + + /* + * Only once the portal and destreceiver have been established can + * we return to the transaction context. All that stuff needs to + * survive an internal commit inside PortalRun! + */ + MemoryContextSwitchTo(oldcontext); + + /* Here's where we actually execute the command. */ + QueryCompletion qc = { 0 }; + (void) PortalRun(portal, FETCH_ALL, isTopLevel, true, receiver, receiver, &qc); + + /* Clean up the receiver. */ + (*receiver->rDestroy)(receiver); + + /* + * Send a CommandComplete message even if we suppressed the query + * results. The user backend will report these in the absence of + * any true query results. + */ + EndCommand(&qc, DestRemote, false); + + /* Clean up the portal. */ + PortalDrop(portal, false); + } + + /* Be sure to advance the command counter after the last script command */ + CommandCounterIncrement(); +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 23306c29c..2cd16b0c7 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -33,6 +33,7 @@ #include "commands/extension.h" #include "libpq/pqsignal.h" #include "catalog/namespace.h" +#include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/maintenanced.h" @@ -97,7 +98,6 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int RebalanceCheckInterval = 1000; -bool RebalanceJobDebugDelay = false; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -124,9 +124,6 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); -static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database, - Oid extensionOwner); -static bool ExecuteRebalanceJob(RebalanceJob *job); /* * InitializeMaintenanceDaemon, called at server start, is responsible for @@ -733,8 +730,8 @@ CitusMaintenanceDaemonMain(Datum main_arg) "Starting background worker for execution."))); rebalanceBgwHandle = - StartRebalanceJobsBackgroundWorker(MyDatabaseId, - myDbData->userOid); + StartCitusBackgroundJobWorker(MyDatabaseId, + myDbData->userOid); if (!rebalanceBgwHandle || GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) == @@ -800,203 +797,6 @@ CitusMaintenanceDaemonMain(Datum main_arg) } -static BackgroundWorkerHandle * -StartRebalanceJobsBackgroundWorker(Oid database, Oid extensionOwner) -{ - BackgroundWorker worker; - BackgroundWorkerHandle *handle = NULL; - - /* Configure a worker. */ - memset(&worker, 0, sizeof(worker)); - SafeSnprintf(worker.bgw_name, BGW_MAXLEN, - "Citus Rebalance Jobs Worker: %u/%u", - database, extensionOwner); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - - /* don't restart, we manage restarts from maintenance daemon */ - worker.bgw_restart_time = BGW_NEVER_RESTART; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), - "RebalanceJobsBackgroundWorkerMain"); - worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); - memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, - sizeof(Oid)); - worker.bgw_notify_pid = MyProcPid; - - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - { - return NULL; - } - - pid_t pid; - WaitForBackgroundWorkerStartup(handle, &pid); - - return handle; -} - - -void -RebalanceJobsBackgroundWorkerMain(Datum arg) -{ - Oid databaseOid = DatumGetObjectId(arg); - - /* extension owner is passed via bgw_extra */ - Oid extensionOwner = InvalidOid; - memcpy_s(&extensionOwner, sizeof(extensionOwner), - MyBgworkerEntry->bgw_extra, sizeof(Oid)); - - BackgroundWorkerUnblockSignals(); - - /* connect to database, after that we can actually access catalogs */ - BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); - - /* make worker recognizable in pg_stat_activity */ - pgstat_report_appname("rebalance jobs worker"); - - ereport(LOG, (errmsg("background jobs runner"))); - - if (RebalanceJobDebugDelay) - { - pg_usleep(30 * 1000 * 1000); - } - - MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext, - "PerJobContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - /* - * First we find all jobs that are running, we need to check if they are still running - * if not reset their state back to scheduled. - */ - { - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* TODO have an actual function to check if the worker is still running */ - ResetRunningJobs(); - - PopActiveSnapshot(); - CommitTransactionCommand(); - } - - - MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext); - bool hasJobs = true; - while (hasJobs) - { - MemoryContextReset(perJobContext); - - CHECK_FOR_INTERRUPTS(); - - InvalidateMetadataSystemCache(); - StartTransactionCommand(); - - PushActiveSnapshot(GetTransactionSnapshot()); - - if (!LockCitusExtension()) - { - ereport(DEBUG1, (errmsg("could not lock the citus extension, " - "skipping metadata sync"))); - } - else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) - { - /* - * We need to load the job into the perJobContext as we will switch contexts - * later due to the committing and starting of new transactions - */ - MemoryContext oldContext = MemoryContextSwitchTo(perJobContext); - RebalanceJob *job = GetRunableRebalanceJob(); - MemoryContextSwitchTo(oldContext); - - if (!job) - { - PopActiveSnapshot(); - CommitTransactionCommand(); - - hasJobs = false; - break; - } - - ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); - - /* Update job status to indicate it is running */ - UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING); - - PopActiveSnapshot(); - CommitTransactionCommand(); - - MemoryContext savedContext = CurrentMemoryContext; - PG_TRY(); - { - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - if (ExecuteRebalanceJob(job)) - { - UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); - - PopActiveSnapshot(); - CommitTransactionCommand(); - } - } - PG_CATCH(); - { - MemoryContextSwitchTo(savedContext); - - ErrorData *edata = CopyErrorData(); - FlushErrorState(); - - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - UpdateJobError(job, edata); - - PopActiveSnapshot(); - CommitTransactionCommand(); - - FreeErrorData(edata); - edata = NULL; - - /* TODO log that there was an error */ - } - PG_END_TRY(); - } - } - - MemoryContextSwitchTo(oldContextPerJob); - MemoryContextDelete(perJobContext); -} - - -static bool -ExecuteRebalanceJob(RebalanceJob *job) -{ - int spiResult = SPI_connect(); - if (spiResult != SPI_OK_CONNECT) - { - ereport(ERROR, (errmsg("could not connect to SPI manager"))); - } - - spiResult = SPI_execute(job->command, false, 0); - -/* if (spiResult != SPIOK) */ -/* { */ -/* ereport(ERROR, (errmsg("could not run SPI query"))); */ -/* } */ - - spiResult = SPI_finish(); - if (spiResult != SPI_OK_FINISH) - { - ereport(ERROR, (errmsg("could not finish SPI connection"))); - } - - return true; -} - - /* * MaintenanceDaemonShmemSize computes how much shared memory is required. */ diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h new file mode 100644 index 000000000..796541abf --- /dev/null +++ b/src/include/distributed/background_jobs.h @@ -0,0 +1,15 @@ +#ifndef CITUS_BACKGROUND_JOBS_H +#define CITUS_BACKGROUND_JOBS_H + +#import "postgres.h" + +#import "postmaster/bgworker.h" + +extern BackgroundWorkerHandle * StartCitusBackgroundJobWorker(Oid database, Oid + extensionOwner); +extern void CitusBackgroundJobMain(Datum arg); +extern void CitusBackgroundJobExecuter(Datum main_arg); + +extern bool RebalanceJobDebugDelay; + +#endif /*CITUS_BACKGROUND_JOBS_H */ diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index 306fab5d5..a09d89085 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -30,6 +30,5 @@ extern void InitializeMaintenanceDaemonBackend(void); extern bool LockCitusExtension(void); extern void CitusMaintenanceDaemonMain(Datum main_arg); -extern void RebalanceJobsBackgroundWorkerMain(Datum arg); #endif /* MAINTENANCED_H */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 071c3ad6b..88e5cec21 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -338,7 +338,8 @@ extern bool JobHasUmnetDependencies(int64 jobid); extern RebalanceJob * GetRunableRebalanceJob(void); extern void ResetRunningJobs(void); extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId); -extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); +extern void UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, + int32 *retry_count, char *message); extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata); extern void UnscheduleDependantJobs(int64 jobid); extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 00311e951..14ced520d 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -14,7 +14,6 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; extern int RebalanceCheckInterval; -extern bool RebalanceJobDebugDelay; extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove;