From 34fb13f3c8382417ed9d216089d384b170a84415 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Tue, 19 May 2020 15:20:46 +0300 Subject: [PATCH] Remove task_tracker.c methods --- src/backend/distributed/shared_library_init.c | 3 - .../test/run_from_same_connection.c | 1 + src/backend/distributed/worker/task_tracker.c | 1049 ----------------- .../distributed/intermediate_result_pruning.h | 1 + .../distributed/multi_server_executor.h | 39 - src/include/distributed/task_tracker.h | 108 -- 6 files changed, 2 insertions(+), 1199 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 9ea14c011..af128ab58 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -273,9 +273,6 @@ _PG_init(void) InitializeMaintenanceDaemon(); - /* organize that task tracker is started once server is up */ - TaskTrackerRegister(); - /* initialize coordinated transaction management */ InitializeTransactionManagement(); InitializeBackendManagement(); diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index 9d08a7cb2..29fc69638 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -18,6 +18,7 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/function_utils.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/lock_graph.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 0a72579f1..714a85323 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -57,1052 +57,3 @@ int TaskTrackerDelay = 200; /* process sleep interval in millisecs */ int MaxRunningTasksPerNode = 16; /* max number of running tasks */ int MaxTrackedTasksPerNode = 1024; /* max number of tracked tasks */ int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */ -WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */ - -/* Hash table shared by the task tracker and task tracker protocol functions */ -HTAB *TaskTrackerTaskHash = NULL; /* shared memory */ - -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; - -/* Flags set by interrupt handlers for later service in the main loop */ -static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t got_SIGTERM = false; - -/* initialization forward declarations */ -static Size TaskTrackerShmemSize(void); -static void TaskTrackerShmemInit(void); - -/* Signal handler forward declarations */ -static void TrackerSigHupHandler(SIGNAL_ARGS); -static void TrackerShutdownHandler(SIGNAL_ARGS); - -/* Local functions forward declarations */ -static void TrackerCleanupJobSchemas(void); -static void TrackerCleanupConnections(HTAB *WorkerTasksHash); -static void TrackerRegisterShutDown(HTAB *WorkerTasksHash); -static void TrackerDelayLoop(void); -static List * SchedulableTaskList(HTAB *WorkerTasksHash); -static WorkerTask * SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash); -static uint32 CountTasksMatchingCriteria(HTAB *WorkerTasksHash, - bool (*CriteriaFunction)(WorkerTask *)); -static bool RunningTask(WorkerTask *workerTask); -static bool SchedulableTask(WorkerTask *workerTask); -static int CompareTasksByTime(const void *first, const void *second); -static void ScheduleWorkerTasks(HTAB *WorkerTasksHash, List *schedulableTaskList); -static void ManageWorkerTasksHash(HTAB *WorkerTasksHash); -static void ManageWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash); -static void RemoveWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash); -static void CreateJobDirectoryIfNotExists(uint64 jobId); -static void MarkWorkerTaskAsFailed(WorkerTask *workerTask); -static int32 ConnectToLocalBackend(const char *databaseName, const char *userName); - - -/* Organize, at startup, that the task tracker is started */ -void -TaskTrackerRegister(void) -{ - BackgroundWorker worker; - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = TaskTrackerShmemInit; - - if (IsUnderPostmaster) - { - return; - } - - /* organize and register initialization of required shared memory */ - RequestAddinShmemSpace(TaskTrackerShmemSize()); - - /* and that the task tracker is started as background worker */ - memset(&worker, 0, sizeof(worker)); - worker.bgw_flags = BGWORKER_SHMEM_ACCESS; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = 1; - strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); - strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_function_name), - "TaskTrackerMain"); - worker.bgw_notify_pid = 0; - strcpy_s(worker.bgw_name, sizeof(worker.bgw_name), "task tracker"); - - RegisterBackgroundWorker(&worker); -} - - -/* Main entry point for task tracker process. */ -void -TaskTrackerMain(Datum main_arg) -{ - sigjmp_buf local_sigjmp_buf; - static bool processStartUp = true; - - /* Properly accept or ignore signals the postmaster might send us */ - pqsignal(SIGHUP, TrackerSigHupHandler); /* set flag to read config file */ - pqsignal(SIGTERM, TrackerShutdownHandler); /* request shutdown */ - - /* We're now ready to receive signals */ - BackgroundWorkerUnblockSignals(); - - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. - */ - MemoryContext TaskTrackerContext = AllocSetContextCreateExtended(TopMemoryContext, - "Task Tracker", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(TaskTrackerContext); - - /* - * If an exception is encountered, processing resumes here. The motivation - * for this code block is outlined in postgres.c, and the code itself is - * heavily based on bgwriter.c. - * - * In most error scenarios, we will not drop here: the task tracker process - * offloads all work to backend processes, and checks the completion of work - * through the client executor library. We will therefore only come here if - * we have inconsistencies in the shared hash and need to signal an error. - */ - if (sigsetjmp(local_sigjmp_buf, 1) != 0) - { - /* Since we are not using PG_TRY, we must reset error stack by hand */ - error_context_stack = NULL; - - /* Prevents interrupts while cleaning up */ - HOLD_INTERRUPTS(); - - /* Report the error to the server log */ - EmitErrorReport(); - - /* - * These operations are just a minimal subset of AbortTransaction(). - * We do not have many resources to worry about; we only have a shared - * hash and an LWLock guarding that hash. - */ - LWLockReleaseAll(); - AtEOXact_Files(false); - AtEOXact_HashTables(false); - - /* - * Now return to normal top-level context, and clear ErrorContext for - * next time. - */ - MemoryContextSwitchTo(TaskTrackerContext); - FlushErrorState(); - - /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(TaskTrackerContext); - - /* Now we can allow interrupts again */ - RESUME_INTERRUPTS(); - - /* - * Sleep at least 1 second after any error. A write error is likely to - * be repeated, and we don't want to be filling the error logs as fast - * as we can. - */ - pg_usleep(1000000L); - } - - /* We can now handle ereport(ERROR) */ - PG_exception_stack = &local_sigjmp_buf; - - /* - * We run validation and cache cleanup functions as this process is starting - * up. If these functions throw an error, we won't try running them again. - */ - if (processStartUp) - { - processStartUp = false; - - /* clean up old files in the job cache */ - TrackerCleanupJobDirectories(); - - /* clean up schemas in the job cache */ - TrackerCleanupJobSchemas(); - } - - /* Loop forever */ - for (;;) - { - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - * - * XXX: Note that PostgreSQL background processes no longer nap between - * their loops, but instead uses latches to wake up when necessary. We - * should switch to using latches in here too, and have the task tracker - * assign function notify us when there is a new task. - */ - if (!PostmasterIsAlive()) - { - exit(1); - } - - /* Process any requests or signals received recently */ - if (got_SIGHUP) - { - got_SIGHUP = false; - - /* reload postgres configuration files */ - ProcessConfigFile(PGC_SIGHUP); - } - if (got_SIGTERM) - { - /* - * From here on, reporting errors should end with exit(1), and not - * send control back to the sigsetjmp block above. - */ - ExitOnAnyError = true; - - /* Close open connections to local backends */ - TrackerCleanupConnections(TaskTrackerTaskHash); - - /* Add a sentinel task to the shared hash to mark shutdown */ - TrackerRegisterShutDown(TaskTrackerTaskHash); - - /* Normal exit from the task tracker is here */ - proc_exit(0); - } - - /* Call the function that does the actual work */ - ManageWorkerTasksHash(TaskTrackerTaskHash); - - /* Sleep for the configured time */ - TrackerDelayLoop(); - } -} - - -/* - * WorkerTasksHashEnter creates a new worker task in the shared hash, and - * performs checks for this task. Note that the caller still needs to initialize - * the worker task's fields, and hold the appopriate locks for the shared hash. - */ -WorkerTask * -WorkerTasksHashEnter(uint64 jobId, uint32 taskId) -{ - bool handleFound = false; - - WorkerTask searchTask; - searchTask.jobId = jobId; - searchTask.taskId = taskId; - - void *hashKey = (void *) &searchTask; - WorkerTask *workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey, - HASH_ENTER_NULL, &handleFound); - if (workerTask == NULL) - { - ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of shared memory"), - errhint("Try increasing citus.max_tracked_tasks_per_node."))); - } - - /* check that we do not have the same task assigned twice to this node */ - if (handleFound) - { - ereport(ERROR, (errmsg("cannot assign an already assigned task"), - errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u", - jobId, taskId))); - } - - return workerTask; -} - - -/* - * WorkerTasksHashFind looks up the worker task with the given identifiers in - * the shared hash. Note that the caller still needs to hold the appropriate - * locks for the shared hash. - */ -WorkerTask * -WorkerTasksHashFind(uint64 jobId, uint32 taskId) -{ - WorkerTask searchTask; - searchTask.jobId = jobId; - searchTask.taskId = taskId; - - void *hashKey = (void *) &searchTask; - WorkerTask *workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey, - HASH_FIND, NULL); - - return workerTask; -} - - -/* - * TrackerCleanupJobDirectories cleans up all files in the job cache directory - * as part of this process's start-up logic. The task tracker process manages - * both tasks in the shared hash and these tasks' output files. When the task - * tracker needs to shutdown, all shared hash entries are deleted, but the - * associated files cannot be cleaned up safely. We therefore perform this - * cleanup when the process restarts. - */ -void -TrackerCleanupJobDirectories(void) -{ - /* use the default tablespace in {datadir}/base */ - StringInfo jobCacheDirectory = makeStringInfo(); - appendStringInfo(jobCacheDirectory, "base/%s", PG_JOB_CACHE_DIR); - - CitusRemoveDirectory(jobCacheDirectory->data); - CitusCreateDirectory(jobCacheDirectory); - - FreeStringInfo(jobCacheDirectory); -} - - -/* - * TrackerCleanupJobSchemas creates and assigns tasks to remove job schemas and - * all tables within these schemas. These job schemas are currently created by - * merge tasks, and may linger if the database shuts down before the jobs get - * cleaned up. This function then runs during process start-up, and creates one - * task per database to remove lingering job schemas, if any. - */ -static void -TrackerCleanupJobSchemas(void) -{ - /* - * XXX: We previously called DatabaseNameList() to read the list of database - * names here. This function read the database names from the flat database - * file; this file was deprecated on Aug 31, 2009. We hence need to rewrite - * this function to read from pg_database directly. - */ - List *databaseNameList = NIL; - const uint64 jobId = RESERVED_JOB_ID; - uint32 taskIndex = 1; - - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - - const char *databaseName = NULL; - foreach_ptr(databaseName, databaseNameList) - { - /* template0 database does not accept connections */ - int skipDatabaseName = strncmp(databaseName, TEMPLATE0_NAME, NAMEDATALEN); - if (skipDatabaseName == 0) - { - continue; - } - - /* - * We create cleanup tasks since we can't remove schemas within the task - * tracker process. We also assign high priorities to these tasks so - * that they get scheduled before everyone else. - */ - WorkerTask *cleanupTask = WorkerTasksHashEnter(jobId, taskIndex); - cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME; - cleanupTask->taskStatus = TASK_ASSIGNED; - - strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, MaxTaskStringSize); - strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN); - - /* zero out all other fields */ - cleanupTask->connectionId = INVALID_CONNECTION_ID; - cleanupTask->failureCount = 0; - - taskIndex++; - } - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - - if (databaseNameList != NIL) - { - list_free_deep(databaseNameList); - } -} - - -/* - * TrackerCleanupConnections closes all open connections to backends during - * process shutdown. This signals to the backends that their connections are - * gone and stops them from logging pipe-related warning messages. - */ -static void -TrackerCleanupConnections(HTAB *WorkerTasksHash) -{ - HASH_SEQ_STATUS status; - - hash_seq_init(&status, WorkerTasksHash); - - WorkerTask *currentTask = (WorkerTask *) hash_seq_search(&status); - while (currentTask != NULL) - { - if (currentTask->connectionId != INVALID_CONNECTION_ID) - { - MultiClientDisconnect(currentTask->connectionId); - currentTask->connectionId = INVALID_CONNECTION_ID; - } - - currentTask = (WorkerTask *) hash_seq_search(&status); - } -} - - -/* - * TrackerRegisterShutDown enters a special marker task to the shared hash. This - * marker task indicates to "task protocol processes" that we are shutting down - * and that they shouldn't accept new task assignments. - */ -static void -TrackerRegisterShutDown(HTAB *WorkerTasksHash) -{ - uint64 jobId = RESERVED_JOB_ID; - uint32 taskId = SHUTDOWN_MARKER_TASK_ID; - - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - - WorkerTask *shutdownMarkerTask = WorkerTasksHashEnter(jobId, taskId); - shutdownMarkerTask->taskStatus = TASK_SUCCEEDED; - shutdownMarkerTask->connectionId = INVALID_CONNECTION_ID; - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); -} - - -/* Sleeps either for the configured time or until a signal is received. */ -static void -TrackerDelayLoop(void) -{ - const long SignalCheckInterval = 1000000L; /* check signal every second */ - - /* - * On some platforms, signals do not interrupt the sleep. To ensure we - * respond promptly when someone signals us, we break down the sleep into - * 1-second increments, and check for interrupts after each nap. - */ - long trackerDelay = TaskTrackerDelay * 1000L; - while (trackerDelay > (SignalCheckInterval - 1)) - { - if (got_SIGHUP || got_SIGTERM) - { - break; - } - pg_usleep(SignalCheckInterval); - trackerDelay -= SignalCheckInterval; - } - if (!(got_SIGHUP || got_SIGTERM)) - { - pg_usleep(trackerDelay); - } -} - - -/* ------------------------------------------------------------ - * Signal handling and shared hash initialization functions follow - * ------------------------------------------------------------ - */ - -/* SIGHUP: set flag to re-read config file at next convenient time */ -static void -TrackerSigHupHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_SIGHUP = true; - if (MyProc != NULL) - { - SetLatch(&MyProc->procLatch); - } - - errno = save_errno; -} - - -/* SIGTERM: set flag for main loop to exit normally */ -static void -TrackerShutdownHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_SIGTERM = true; - if (MyProc != NULL) - { - SetLatch(&MyProc->procLatch); - } - - errno = save_errno; -} - - -/* Estimates the shared memory size used for keeping track of tasks. */ -static Size -TaskTrackerShmemSize(void) -{ - Size size = 0; - - size = add_size(size, sizeof(WorkerTasksSharedStateData)); - - Size hashSize = hash_estimate_size(MaxTrackedTasksPerNode, WORKER_TASK_SIZE); - size = add_size(size, hashSize); - - return size; -} - - -/* Initializes the shared memory used for keeping track of tasks. */ -static void -TaskTrackerShmemInit(void) -{ - bool alreadyInitialized = false; - HASHCTL info; - - long maxTableSize = (long) MaxTrackedTasksPerNode; - long initTableSize = maxTableSize / 8; - - /* - * Allocate the control structure for the hash table that maps unique task - * identifiers (uint64:uint32) to general task information, as well as the - * parameters needed to run the task. - */ - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(uint64) + sizeof(uint32); - info.entrysize = WORKER_TASK_SIZE; - info.hash = tag_hash; - int hashFlags = (HASH_ELEM | HASH_FUNCTION); - - /* - * Currently the lock isn't required because allocation only happens at - * startup in postmaster, but it doesn't hurt, and makes things more - * consistent with other extensions. - */ - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - - /* allocate struct containing task tracker related shared state */ - WorkerTasksSharedState = - (WorkerTasksSharedStateData *) ShmemInitStruct("Worker Task Control", - sizeof(WorkerTasksSharedStateData), - &alreadyInitialized); - - if (!alreadyInitialized) - { - WorkerTasksSharedState->taskHashTrancheId = LWLockNewTrancheId(); - WorkerTasksSharedState->taskHashTrancheName = "Worker Task Hash Tranche"; - LWLockRegisterTranche(WorkerTasksSharedState->taskHashTrancheId, - WorkerTasksSharedState->taskHashTrancheName); - - LWLockInitialize(&WorkerTasksSharedState->taskHashLock, - WorkerTasksSharedState->taskHashTrancheId); - - WorkerTasksSharedState->conninfosValid = true; - } - - /* allocate hash table */ - TaskTrackerTaskHash = ShmemInitHash("Worker Task Hash", initTableSize, maxTableSize, - &info, hashFlags); - - LWLockRelease(AddinShmemInitLock); - - Assert(TaskTrackerTaskHash != NULL); - Assert(WorkerTasksSharedState->taskHashTrancheId != 0); - - if (prev_shmem_startup_hook != NULL) - { - prev_shmem_startup_hook(); - } -} - - -/* ------------------------------------------------------------ - * Task scheduling and management functions follow - * ------------------------------------------------------------ - */ - -/* - * SchedulableTaskList calculates the number of tasks to schedule at this given - * moment, and creates a deep-copied list containing that many tasks. The tasks - * in the list are sorted according to a priority criteria, currently the task's - * assignment time. Note that this function expects the caller to hold a read - * lock over the shared hash. - */ -static List * -SchedulableTaskList(HTAB *WorkerTasksHash) -{ - List *schedulableTaskList = NIL; - - uint32 runningTaskCount = CountTasksMatchingCriteria(WorkerTasksHash, &RunningTask); - if (runningTaskCount >= MaxRunningTasksPerNode) - { - return NIL; /* we already have enough tasks running */ - } - - uint32 schedulableTaskCount = CountTasksMatchingCriteria(WorkerTasksHash, - &SchedulableTask); - if (schedulableTaskCount == 0) - { - return NIL; /* we do not have any new tasks to schedule */ - } - - uint32 tasksToScheduleCount = MaxRunningTasksPerNode - runningTaskCount; - if (tasksToScheduleCount > schedulableTaskCount) - { - tasksToScheduleCount = schedulableTaskCount; - } - - /* get all schedulable tasks ordered according to a priority criteria */ - WorkerTask *schedulableTaskQueue = SchedulableTaskPriorityQueue(WorkerTasksHash); - if (schedulableTaskQueue == NULL) - { - return NIL; - } - - for (uint32 queueIndex = 0; queueIndex < tasksToScheduleCount; queueIndex++) - { - WorkerTask *schedulableTask = (WorkerTask *) palloc0(WORKER_TASK_SIZE); - WorkerTask *queuedTask = WORKER_TASK_AT(schedulableTaskQueue, queueIndex); - schedulableTask->jobId = queuedTask->jobId; - schedulableTask->taskId = queuedTask->taskId; - - schedulableTaskList = lappend(schedulableTaskList, schedulableTask); - } - - /* free priority queue */ - pfree(schedulableTaskQueue); - - return schedulableTaskList; -} - - -/* - * SchedulableTaskPriorityQueue allocates an array containing all schedulable - * tasks in the shared hash, orders these tasks according to a sorting criteria, - * and returns the sorted array. - */ -static WorkerTask * -SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) -{ - HASH_SEQ_STATUS status; - uint32 queueIndex = 0; - - /* our priority queue size equals to the number of schedulable tasks */ - uint32 queueSize = CountTasksMatchingCriteria(WorkerTasksHash, &SchedulableTask); - if (queueSize == 0) - { - return NULL; - } - - /* allocate an array of tasks for our priority queue */ - WorkerTask *priorityQueue = (WorkerTask *) palloc0(WORKER_TASK_SIZE * queueSize); - - /* copy tasks in the shared hash to the priority queue */ - hash_seq_init(&status, WorkerTasksHash); - - WorkerTask *currentTask = (WorkerTask *) hash_seq_search(&status); - while (currentTask != NULL) - { - if (SchedulableTask(currentTask)) - { - /* tasks in the priority queue only need the first three fields */ - WorkerTask *queueTask = WORKER_TASK_AT(priorityQueue, queueIndex); - - queueTask->jobId = currentTask->jobId; - queueTask->taskId = currentTask->taskId; - queueTask->assignedAt = currentTask->assignedAt; - - queueIndex++; - } - - currentTask = (WorkerTask *) hash_seq_search(&status); - } - - /* now order elements in the queue according to our sorting criterion */ - SafeQsort(priorityQueue, queueSize, WORKER_TASK_SIZE, CompareTasksByTime); - - return priorityQueue; -} - - -/* Counts the number of tasks that match the given criteria function. */ -static uint32 -CountTasksMatchingCriteria(HTAB *WorkerTasksHash, - bool (*CriteriaFunction)(WorkerTask *)) -{ - HASH_SEQ_STATUS status; - uint32 taskCount = 0; - - hash_seq_init(&status, WorkerTasksHash); - - WorkerTask *currentTask = (WorkerTask *) hash_seq_search(&status); - while (currentTask != NULL) - { - bool matchesCriteria = (*CriteriaFunction)(currentTask); - if (matchesCriteria) - { - taskCount++; - } - - currentTask = (WorkerTask *) hash_seq_search(&status); - } - - return taskCount; -} - - -/* Checks if the worker task is running. */ -static bool -RunningTask(WorkerTask *workerTask) -{ - TaskStatus currentStatus = workerTask->taskStatus; - if (currentStatus == TASK_RUNNING) - { - return true; - } - - return false; -} - - -/* Checks if the worker task can be scheduled to run. */ -static bool -SchedulableTask(WorkerTask *workerTask) -{ - TaskStatus currentStatus = workerTask->taskStatus; - if (currentStatus == TASK_ASSIGNED) - { - return true; - } - - return false; -} - - -/* Comparison function to compare two worker tasks by their assignment times. */ -static int -CompareTasksByTime(const void *first, const void *second) -{ - WorkerTask *firstTask = (WorkerTask *) first; - WorkerTask *secondTask = (WorkerTask *) second; - - /* tasks that are assigned earlier have higher priority */ - int timeDiff = firstTask->assignedAt - secondTask->assignedAt; - return timeDiff; -} - - -/* - * ScheduleWorkerTasks takes a list of tasks to schedule, and for each task in - * the list, finds and schedules the corresponding task from the shared hash. - * Note that this function expects the caller to hold an exclusive lock over the - * shared hash. - */ -static void -ScheduleWorkerTasks(HTAB *WorkerTasksHash, List *schedulableTaskList) -{ - WorkerTask *schedulableTask = NULL; - foreach_ptr(schedulableTask, schedulableTaskList) - { - void *hashKey = (void *) schedulableTask; - - WorkerTask *taskToSchedule = (WorkerTask *) hash_search(WorkerTasksHash, hashKey, - HASH_FIND, NULL); - - /* if task is null, the shared hash is in an incosistent state */ - if (taskToSchedule == NULL) - { - ereport(ERROR, (errmsg("could not find the worker task to schedule"), - errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u", - schedulableTask->jobId, schedulableTask->taskId))); - } - - /* - * After determining the set of tasks to schedule, we release the hash's - * shared lock for a short time period. We then re-acquire the lock in - * exclusive mode. We therefore need to check if this task has been - * canceled in the meantime. - */ - if (taskToSchedule->taskStatus != TASK_CANCEL_REQUESTED) - { - Assert(SchedulableTask(taskToSchedule)); - - taskToSchedule->taskStatus = TASK_SCHEDULED; - } - else - { - ereport(INFO, (errmsg("the worker task to schedule has been canceled"), - errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u", - schedulableTask->jobId, schedulableTask->taskId))); - } - } -} - - -/* Manages the scheduling and execution of all tasks in the shared hash. */ -static void -ManageWorkerTasksHash(HTAB *WorkerTasksHash) -{ - HASH_SEQ_STATUS status; - - /* ask the scheduler if we have new tasks to schedule */ - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); - List *schedulableTaskList = SchedulableTaskList(WorkerTasksHash); - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - - if (!WorkerTasksSharedState->conninfosValid) - { - InvalidateConnParamsHashEntries(); - } - - /* schedule new tasks if we have any */ - if (schedulableTaskList != NIL) - { - ScheduleWorkerTasks(WorkerTasksHash, schedulableTaskList); - list_free_deep(schedulableTaskList); - } - - /* now iterate over all tasks, and manage them */ - hash_seq_init(&status, WorkerTasksHash); - - WorkerTask *currentTask = (WorkerTask *) hash_seq_search(&status); - while (currentTask != NULL) - { - ManageWorkerTask(currentTask, WorkerTasksHash); - - /* - * Typically, we delete worker tasks in the task tracker protocol - * process. This task however was canceled mid-query, and the protocol - * process asked us to remove it from the shared hash. - */ - if (currentTask->taskStatus == TASK_TO_REMOVE) - { - RemoveWorkerTask(currentTask, WorkerTasksHash); - } - - currentTask = (WorkerTask *) hash_seq_search(&status); - } - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); -} - - -/* - * ManageWorkerTask manages the execution of the worker task. More specifically, - * the function connects to a local backend, sends the query associated with the - * task, and oversees the query's execution. Note that this function expects the - * caller to hold an exclusive lock over the shared hash. - */ -static void -ManageWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash) -{ - switch (workerTask->taskStatus) - { - case TASK_ASSIGNED: - { - break; /* nothing to do until the task gets scheduled */ - } - - case TASK_SCHEDULED: - { - /* create the job output directory if it does not exist */ - CreateJobDirectoryIfNotExists(workerTask->jobId); - - /* the task is ready to run; connect to local backend */ - workerTask->connectionId = ConnectToLocalBackend(workerTask->databaseName, - workerTask->userName); - - if (workerTask->connectionId != INVALID_CONNECTION_ID) - { - bool taskSent = MultiClientSendQuery(workerTask->connectionId, - workerTask->taskCallString); - if (taskSent) - { - workerTask->taskStatus = TASK_RUNNING; - } - else - { - MarkWorkerTaskAsFailed(workerTask); - - MultiClientDisconnect(workerTask->connectionId); - workerTask->connectionId = INVALID_CONNECTION_ID; - } - } - else - { - MarkWorkerTaskAsFailed(workerTask); - } - - break; - } - - case TASK_RUNNING: - { - int32 connectionId = workerTask->connectionId; - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - - /* check if query results are ready, in progress, or unavailable */ - if (resultStatus == CLIENT_RESULT_READY) - { - QueryStatus queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) - { - workerTask->taskStatus = TASK_SUCCEEDED; - } - else if (queryStatus == CLIENT_QUERY_FAILED) - { - MarkWorkerTaskAsFailed(workerTask); - } - else - { - ereport(FATAL, (errmsg("invalid query status: %d", queryStatus))); - } - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - workerTask->taskStatus = TASK_RUNNING; - } - else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) - { - MarkWorkerTaskAsFailed(workerTask); - } - - /* clean up the connection if we are done with the task */ - if (resultStatus != CLIENT_RESULT_BUSY) - { - MultiClientDisconnect(workerTask->connectionId); - workerTask->connectionId = INVALID_CONNECTION_ID; - } - - break; - } - - case TASK_FAILED: - { - if (workerTask->failureCount < MAX_TASK_FAILURE_COUNT) - { - workerTask->taskStatus = TASK_ASSIGNED; - } - else - { - workerTask->taskStatus = TASK_PERMANENTLY_FAILED; - } - - break; - } - - case TASK_PERMANENTLY_FAILED: - case TASK_SUCCEEDED: - { - break; - } - - case TASK_CANCEL_REQUESTED: - { - /* - * If this task is responsible for cleaning up the shared hash, we - * give the task more time instead of canceling it. The reason this - * task is marked for cancellation is that its file descriptor needs - * to be reclaimed after the clean up completes. - */ - if (workerTask->taskId == JOB_CLEANUP_TASK_ID) - { - workerTask->taskStatus = TASK_CANCELED; - break; - } - - if (workerTask->connectionId != INVALID_CONNECTION_ID) - { - int32 connectionId = workerTask->connectionId; - - ResultStatus status = MultiClientResultStatus(connectionId); - if (status == CLIENT_RESULT_BUSY) - { - MultiClientCancel(connectionId); - } - } - - /* give the backend some time to flush its response */ - workerTask->taskStatus = TASK_CANCELED; - break; - } - - case TASK_CANCELED: - { - if (workerTask->connectionId != INVALID_CONNECTION_ID) - { - MultiClientDisconnect(workerTask->connectionId); - workerTask->connectionId = INVALID_CONNECTION_ID; - } - - if (workerTask->taskId == JOB_CLEANUP_TASK_ID) - { - StringInfo jobDirectoryName = JobDirectoryName(workerTask->jobId); - CitusRemoveDirectory(jobDirectoryName->data); - } - - workerTask->taskStatus = TASK_TO_REMOVE; - break; - } - - case TASK_TO_REMOVE: - default: - { - /* we fatal here to avoid leaking client-side resources */ - ereport(FATAL, (errmsg("invalid task status: %d", workerTask->taskStatus))); - break; - } - } - - Assert(workerTask->failureCount <= MAX_TASK_FAILURE_COUNT); -} - - -/* - * MarkWorkerTaskAsFailed marks the given worker task as failed - * and increases the failure count. Failure count is used to - * determine if the task should be marked as permanently failed. - */ -static void -MarkWorkerTaskAsFailed(WorkerTask *workerTask) -{ - workerTask->taskStatus = TASK_FAILED; - workerTask->failureCount++; -} - - -/* Wrapper function to remove the worker task from the shared hash. */ -static void -RemoveWorkerTask(WorkerTask *workerTask, HTAB *WorkerTasksHash) -{ - void *hashKey = (void *) workerTask; - - WorkerTask *taskRemoved = hash_search(WorkerTasksHash, hashKey, HASH_REMOVE, NULL); - if (taskRemoved == NULL) - { - ereport(FATAL, (errmsg("worker task hash corrupted"))); - } -} - - -/* Wrapper function to create the job directory if it does not already exist. */ -static void -CreateJobDirectoryIfNotExists(uint64 jobId) -{ - StringInfo jobDirectoryName = JobDirectoryName(jobId); - - bool jobDirectoryExists = DirectoryExists(jobDirectoryName); - if (!jobDirectoryExists) - { - CitusCreateDirectory(jobDirectoryName); - } - - FreeStringInfo(jobDirectoryName); -} - - -/* Wrapper function to inititate connection to local backend. */ -static int32 -ConnectToLocalBackend(const char *databaseName, const char *userName) -{ - const char *nodeName = LOCAL_HOST_NAME; - const uint32 nodePort = PostPortNumber; - - /* - * Our client library currently only handles TCP sockets. We therefore do - * not use Unix domain sockets here. - */ - int32 connectionId = MultiClientConnect(nodeName, nodePort, databaseName, userName); - - return connectionId; -} diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h index 5bde93313..d207397c2 100644 --- a/src/include/distributed/intermediate_result_pruning.h +++ b/src/include/distributed/intermediate_result_pruning.h @@ -17,6 +17,7 @@ * UINT32_MAX is reserved in pg_dist_node, so we can use it safely. */ #define LOCAL_NODE_ID UINT32_MAX +#define LOCAL_HOST_NAME "localhost" /* connect to local backends using this name */ extern bool LogIntermediateResults; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index f616d2f73..b18a49e00 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -125,45 +125,6 @@ struct TaskExecution }; -/* - * TrackerTaskState represents a task's execution status on a particular task - * tracker. This state augments task execution state in that it is associated - * with execution on a particular task tracker. - */ -typedef struct TrackerTaskState -{ - uint64 jobId; - uint32 taskId; - TaskStatus status; - StringInfo taskAssignmentQuery; -} TrackerTaskState; - - -/* - * TaskTracker keeps connection and task related state for a task tracker. The - * task tracker executor then uses this state to open and manage a connection to - * the task tracker; and assign and check status of tasks over this connection. - */ -typedef struct TaskTracker -{ - uint32 workerPort; /* node's port; part of hash table key */ - char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */ - char *userName; /* which user to connect as */ - TrackerStatus trackerStatus; - int32 connectionId; - uint32 connectPollCount; - uint32 connectionFailureCount; - uint32 trackerFailureCount; - - HTAB *taskStateHash; - List *assignedTaskList; - int32 currentTaskIndex; - bool connectionBusy; - TrackerTaskState *connectionBusyOnTask; - List *connectionBusyOnTaskList; -} TaskTracker; - - /* Config variable managed via guc.c */ extern int RemoteTaskCheckInterval; extern int MaxAssignTaskBatchSize; diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 3c94606af..d13a3c8c1 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -18,118 +18,10 @@ #include "storage/lwlock.h" #include "utils/hsearch.h" - -#define HIGH_PRIORITY_TASK_TIME 1 /* assignment time for high priority tasks */ -#define RESERVED_JOB_ID 1 /* reserved for cleanup and shutdown tasks */ -#define SHUTDOWN_MARKER_TASK_ID UINT_MAX /* used to identify task tracker shutdown */ -#define MAX_TASK_FAILURE_COUNT 2 /* allowed failure count for one task */ -#define LOCAL_HOST_NAME "localhost" /* connect to local backends using this name */ -#define TASK_CALL_STRING_SIZE 12288 /* max length of task call string */ -#define TEMPLATE0_NAME "template0" /* skip job schema cleanup for template0 */ -#define JOB_SCHEMA_CLEANUP "SELECT worker_cleanup_job_schema_cache()" - - -/* - * TaskStatus represents execution status of worker tasks. The assigned and - * cancel requested statuses are set by the master node; all other statuses are - * assigned by the task tracker as the worker task makes progress. - */ -typedef enum -{ - TASK_STATUS_INVALID_FIRST = 0, - TASK_ASSIGNED = 1, /* master node and task tracker */ - TASK_SCHEDULED = 2, - TASK_RUNNING = 3, - TASK_FAILED = 4, - TASK_PERMANENTLY_FAILED = 5, - TASK_SUCCEEDED = 6, - TASK_CANCEL_REQUESTED = 7, /* master node only */ - TASK_CANCELED = 8, - TASK_TO_REMOVE = 9, - - /* - * The master node's executor uses the following statuses to fully represent - * the execution status of worker tasks, as they are perceived by the master - * node. These statuses in fact don't belong with the task tracker. - */ - TASK_CLIENT_SIDE_QUEUED = 10, - TASK_CLIENT_SIDE_ASSIGN_FAILED = 11, - TASK_CLIENT_SIDE_STATUS_FAILED = 12, - TASK_FILE_TRANSMIT_QUEUED = 13, - TASK_CLIENT_SIDE_TRANSMIT_FAILED = 14, - - /* - * Add new task status types above this comment. Existing types, except for - * TASK_STATUS_LAST, should never have their numbers changed. - */ - TASK_STATUS_LAST -} TaskStatus; - - -/* - * WorkerTask keeps shared memory state for tasks. At a high level, each worker - * task holds onto three different types of state: (a) state assigned by the - * master node, (b) state initialized by the protocol process at task assignment - * time, and (c) state internal to the task tracker process that changes as the - * task make progress. - * - * Since taskCallString is dynamically sized use WORKER_TASK_SIZE instead of - * sizeof(WorkerTask). Use WORKER_TASK_AT to reference an item in WorkerTask array. - */ -typedef struct WorkerTask -{ - uint64 jobId; /* job id (upper 32-bits reserved); part of hash table key */ - uint32 taskId; /* task id; part of hash table key */ - uint32 assignedAt; /* task assignment time in epoch seconds */ - - TaskStatus taskStatus; /* task's current execution status */ - char databaseName[NAMEDATALEN]; /* name to use for local backend connection */ - char userName[NAMEDATALEN]; /* user to use for local backend connection */ - int32 connectionId; /* connection id to local backend */ - uint32 failureCount; /* number of task failures */ - char taskCallString[FLEXIBLE_ARRAY_MEMBER]; /* query or function call string */ -} WorkerTask; - -#define WORKER_TASK_SIZE (offsetof(WorkerTask, taskCallString) + MaxTaskStringSize) - -#define WORKER_TASK_AT(workerTasks, index) \ - ((WorkerTask *) (((char *) (workerTasks)) + (index) * WORKER_TASK_SIZE)) - -/* - * WorkerTasksControlData contains task tracker state shared between - * processes. - */ -typedef struct WorkerTasksSharedStateData -{ - /* Lock protecting workerNodesHash */ - int taskHashTrancheId; - char *taskHashTrancheName; - LWLock taskHashLock; - bool conninfosValid; -} WorkerTasksSharedStateData; - - -extern void TrackerCleanupJobDirectories(void); - /* Config variables managed via guc.c */ extern int TaskTrackerDelay; extern int MaxTrackedTasksPerNode; extern int MaxRunningTasksPerNode; extern int MaxTaskStringSize; -/* State shared by the task tracker and task tracker protocol functions */ -extern WorkerTasksSharedStateData *WorkerTasksSharedState; -extern HTAB *TaskTrackerTaskHash; - -/* Entry point */ -extern void TaskTrackerMain(Datum main_arg); - -/* Function declarations local to the worker module */ -extern WorkerTask * WorkerTasksHashEnter(uint64 jobId, uint32 taskId); -extern WorkerTask * WorkerTasksHashFind(uint64 jobId, uint32 taskId); - -/* Function declarations for starting up and running the task tracker */ -extern void TaskTrackerRegister(void); - - #endif /* TASK_TRACKER_H */