Remove task_tracker.c methods

pull/3850/head
Sait Talha Nisanci 2020-05-19 15:20:46 +03:00
parent 26a0ee2d9b
commit 34fb13f3c8
6 changed files with 2 additions and 1199 deletions

View File

@ -273,9 +273,6 @@ _PG_init(void)
InitializeMaintenanceDaemon();
/* organize that task tracker is started once server is up */
TaskTrackerRegister();
/* initialize coordinated transaction management */
InitializeTransactionManagement();
InitializeBackendManagement();

View File

@ -18,6 +18,7 @@
#include "access/xact.h"
#include "distributed/connection_management.h"
#include "distributed/function_utils.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/lock_graph.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@
* UINT32_MAX is reserved in pg_dist_node, so we can use it safely.
*/
#define LOCAL_NODE_ID UINT32_MAX
#define LOCAL_HOST_NAME "localhost" /* connect to local backends using this name */
extern bool LogIntermediateResults;

View File

@ -125,45 +125,6 @@ struct TaskExecution
};
/*
* TrackerTaskState represents a task's execution status on a particular task
* tracker. This state augments task execution state in that it is associated
* with execution on a particular task tracker.
*/
typedef struct TrackerTaskState
{
uint64 jobId;
uint32 taskId;
TaskStatus status;
StringInfo taskAssignmentQuery;
} TrackerTaskState;
/*
* TaskTracker keeps connection and task related state for a task tracker. The
* task tracker executor then uses this state to open and manage a connection to
* the task tracker; and assign and check status of tasks over this connection.
*/
typedef struct TaskTracker
{
uint32 workerPort; /* node's port; part of hash table key */
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
char *userName; /* which user to connect as */
TrackerStatus trackerStatus;
int32 connectionId;
uint32 connectPollCount;
uint32 connectionFailureCount;
uint32 trackerFailureCount;
HTAB *taskStateHash;
List *assignedTaskList;
int32 currentTaskIndex;
bool connectionBusy;
TrackerTaskState *connectionBusyOnTask;
List *connectionBusyOnTaskList;
} TaskTracker;
/* Config variable managed via guc.c */
extern int RemoteTaskCheckInterval;
extern int MaxAssignTaskBatchSize;

View File

@ -18,118 +18,10 @@
#include "storage/lwlock.h"
#include "utils/hsearch.h"
#define HIGH_PRIORITY_TASK_TIME 1 /* assignment time for high priority tasks */
#define RESERVED_JOB_ID 1 /* reserved for cleanup and shutdown tasks */
#define SHUTDOWN_MARKER_TASK_ID UINT_MAX /* used to identify task tracker shutdown */
#define MAX_TASK_FAILURE_COUNT 2 /* allowed failure count for one task */
#define LOCAL_HOST_NAME "localhost" /* connect to local backends using this name */
#define TASK_CALL_STRING_SIZE 12288 /* max length of task call string */
#define TEMPLATE0_NAME "template0" /* skip job schema cleanup for template0 */
#define JOB_SCHEMA_CLEANUP "SELECT worker_cleanup_job_schema_cache()"
/*
* TaskStatus represents execution status of worker tasks. The assigned and
* cancel requested statuses are set by the master node; all other statuses are
* assigned by the task tracker as the worker task makes progress.
*/
typedef enum
{
TASK_STATUS_INVALID_FIRST = 0,
TASK_ASSIGNED = 1, /* master node and task tracker */
TASK_SCHEDULED = 2,
TASK_RUNNING = 3,
TASK_FAILED = 4,
TASK_PERMANENTLY_FAILED = 5,
TASK_SUCCEEDED = 6,
TASK_CANCEL_REQUESTED = 7, /* master node only */
TASK_CANCELED = 8,
TASK_TO_REMOVE = 9,
/*
* The master node's executor uses the following statuses to fully represent
* the execution status of worker tasks, as they are perceived by the master
* node. These statuses in fact don't belong with the task tracker.
*/
TASK_CLIENT_SIDE_QUEUED = 10,
TASK_CLIENT_SIDE_ASSIGN_FAILED = 11,
TASK_CLIENT_SIDE_STATUS_FAILED = 12,
TASK_FILE_TRANSMIT_QUEUED = 13,
TASK_CLIENT_SIDE_TRANSMIT_FAILED = 14,
/*
* Add new task status types above this comment. Existing types, except for
* TASK_STATUS_LAST, should never have their numbers changed.
*/
TASK_STATUS_LAST
} TaskStatus;
/*
* WorkerTask keeps shared memory state for tasks. At a high level, each worker
* task holds onto three different types of state: (a) state assigned by the
* master node, (b) state initialized by the protocol process at task assignment
* time, and (c) state internal to the task tracker process that changes as the
* task make progress.
*
* Since taskCallString is dynamically sized use WORKER_TASK_SIZE instead of
* sizeof(WorkerTask). Use WORKER_TASK_AT to reference an item in WorkerTask array.
*/
typedef struct WorkerTask
{
uint64 jobId; /* job id (upper 32-bits reserved); part of hash table key */
uint32 taskId; /* task id; part of hash table key */
uint32 assignedAt; /* task assignment time in epoch seconds */
TaskStatus taskStatus; /* task's current execution status */
char databaseName[NAMEDATALEN]; /* name to use for local backend connection */
char userName[NAMEDATALEN]; /* user to use for local backend connection */
int32 connectionId; /* connection id to local backend */
uint32 failureCount; /* number of task failures */
char taskCallString[FLEXIBLE_ARRAY_MEMBER]; /* query or function call string */
} WorkerTask;
#define WORKER_TASK_SIZE (offsetof(WorkerTask, taskCallString) + MaxTaskStringSize)
#define WORKER_TASK_AT(workerTasks, index) \
((WorkerTask *) (((char *) (workerTasks)) + (index) * WORKER_TASK_SIZE))
/*
* WorkerTasksControlData contains task tracker state shared between
* processes.
*/
typedef struct WorkerTasksSharedStateData
{
/* Lock protecting workerNodesHash */
int taskHashTrancheId;
char *taskHashTrancheName;
LWLock taskHashLock;
bool conninfosValid;
} WorkerTasksSharedStateData;
extern void TrackerCleanupJobDirectories(void);
/* Config variables managed via guc.c */
extern int TaskTrackerDelay;
extern int MaxTrackedTasksPerNode;
extern int MaxRunningTasksPerNode;
extern int MaxTaskStringSize;
/* State shared by the task tracker and task tracker protocol functions */
extern WorkerTasksSharedStateData *WorkerTasksSharedState;
extern HTAB *TaskTrackerTaskHash;
/* Entry point */
extern void TaskTrackerMain(Datum main_arg);
/* Function declarations local to the worker module */
extern WorkerTask * WorkerTasksHashEnter(uint64 jobId, uint32 taskId);
extern WorkerTask * WorkerTasksHashFind(uint64 jobId, uint32 taskId);
/* Function declarations for starting up and running the task tracker */
extern void TaskTrackerRegister(void);
#endif /* TASK_TRACKER_H */