mirror of https://github.com/citusdata/citus.git
remove unused methods from multi_server_executor
parent
34fb13f3c8
commit
3cb8d467b7
|
@ -164,22 +164,6 @@ HasReplicatedDistributedTable(List *relationOids)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* MaxMasterConnectionCount returns the number of connections a master can open.
|
|
||||||
* A master cannot create more than a certain number of file descriptors (FDs).
|
|
||||||
* Every task requires 2 FDs, one file and one connection. Some FDs are taken by
|
|
||||||
* the VFD pool and there is currently no way to reclaim these before opening a
|
|
||||||
* connection. We therefore assume some FDs to be reserved for VFDs, based on
|
|
||||||
* observing a typical size of the pool on a Citus master.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
MaxMasterConnectionCount(void)
|
|
||||||
{
|
|
||||||
return Max((max_files_per_process - RESERVED_FD_COUNT) / 2, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoveJobDirectory gets automatically called at portal drop (end of query) or
|
* RemoveJobDirectory gets automatically called at portal drop (end of query) or
|
||||||
* at transaction abort. The function removes the job directory and releases the
|
* at transaction abort. The function removes the job directory and releases the
|
||||||
|
@ -194,121 +178,6 @@ RemoveJobDirectory(uint64 jobId)
|
||||||
ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId);
|
ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InitTaskExecution creates a task execution structure for the given task, and
|
|
||||||
* initializes execution related fields.
|
|
||||||
*/
|
|
||||||
TaskExecution *
|
|
||||||
InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus)
|
|
||||||
{
|
|
||||||
/* each task placement (assignment) corresponds to one worker node */
|
|
||||||
uint32 nodeCount = list_length(task->taskPlacementList);
|
|
||||||
|
|
||||||
TaskExecution *taskExecution = CitusMakeNode(TaskExecution);
|
|
||||||
|
|
||||||
taskExecution->jobId = task->jobId;
|
|
||||||
taskExecution->taskId = task->taskId;
|
|
||||||
taskExecution->nodeCount = nodeCount;
|
|
||||||
taskExecution->currentNodeIndex = 0;
|
|
||||||
taskExecution->failureCount = 0;
|
|
||||||
|
|
||||||
taskExecution->taskStatusArray = palloc0(nodeCount * sizeof(TaskExecStatus));
|
|
||||||
taskExecution->transmitStatusArray = palloc0(nodeCount * sizeof(TransmitExecStatus));
|
|
||||||
taskExecution->connectionIdArray = palloc0(nodeCount * sizeof(int32));
|
|
||||||
taskExecution->fileDescriptorArray = palloc0(nodeCount * sizeof(int32));
|
|
||||||
|
|
||||||
for (uint32 nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++)
|
|
||||||
{
|
|
||||||
taskExecution->taskStatusArray[nodeIndex] = initialTaskExecStatus;
|
|
||||||
taskExecution->transmitStatusArray[nodeIndex] = EXEC_TRANSMIT_UNASSIGNED;
|
|
||||||
taskExecution->connectionIdArray[nodeIndex] = INVALID_CONNECTION_ID;
|
|
||||||
taskExecution->fileDescriptorArray[nodeIndex] = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return taskExecution;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CleanupTaskExecution iterates over all connections and file descriptors for
|
|
||||||
* the given task execution. The function first closes all open connections and
|
|
||||||
* file descriptors, and then frees memory allocated for the task execution.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
CleanupTaskExecution(TaskExecution *taskExecution)
|
|
||||||
{
|
|
||||||
for (uint32 nodeIndex = 0; nodeIndex < taskExecution->nodeCount; nodeIndex++)
|
|
||||||
{
|
|
||||||
int32 connectionId = taskExecution->connectionIdArray[nodeIndex];
|
|
||||||
int32 fileDescriptor = taskExecution->fileDescriptorArray[nodeIndex];
|
|
||||||
|
|
||||||
/* close open connection */
|
|
||||||
if (connectionId != INVALID_CONNECTION_ID)
|
|
||||||
{
|
|
||||||
MultiClientDisconnect(connectionId);
|
|
||||||
taskExecution->connectionIdArray[nodeIndex] = INVALID_CONNECTION_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* close open file */
|
|
||||||
if (fileDescriptor >= 0)
|
|
||||||
{
|
|
||||||
int closed = close(fileDescriptor);
|
|
||||||
taskExecution->fileDescriptorArray[nodeIndex] = -1;
|
|
||||||
|
|
||||||
if (closed < 0)
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errcode_for_file_access(),
|
|
||||||
errmsg("could not close copy file: %m")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* deallocate memory and reset all fields */
|
|
||||||
pfree(taskExecution->taskStatusArray);
|
|
||||||
pfree(taskExecution->connectionIdArray);
|
|
||||||
pfree(taskExecution->fileDescriptorArray);
|
|
||||||
pfree(taskExecution);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Determines if the given task exceeded its failure threshold. */
|
|
||||||
bool
|
|
||||||
TaskExecutionFailed(TaskExecution *taskExecution)
|
|
||||||
{
|
|
||||||
if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AdjustStateForFailure increments the failure count for given task execution.
|
|
||||||
* The function also determines the next worker node that should be contacted
|
|
||||||
* for remote execution.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
AdjustStateForFailure(TaskExecution *taskExecution)
|
|
||||||
{
|
|
||||||
int maxNodeIndex = taskExecution->nodeCount - 1;
|
|
||||||
Assert(maxNodeIndex >= 0);
|
|
||||||
|
|
||||||
if (taskExecution->currentNodeIndex < maxNodeIndex)
|
|
||||||
{
|
|
||||||
taskExecution->currentNodeIndex++; /* try next worker node */
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
taskExecution->currentNodeIndex = 0; /* go back to the first worker node */
|
|
||||||
}
|
|
||||||
|
|
||||||
taskExecution->failureCount++; /* record failure */
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate
|
* CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate
|
||||||
* results, if there is any.
|
* results, if there is any.
|
||||||
|
|
|
@ -350,8 +350,6 @@ CopyNodeTaskExecution(COPYFUNC_ARGS)
|
||||||
COPY_SCALAR_FIELD(taskId);
|
COPY_SCALAR_FIELD(taskId);
|
||||||
COPY_SCALAR_FIELD(nodeCount);
|
COPY_SCALAR_FIELD(nodeCount);
|
||||||
|
|
||||||
COPY_SCALAR_ARRAY(taskStatusArray, TaskExecStatus, from->nodeCount);
|
|
||||||
COPY_SCALAR_ARRAY(transmitStatusArray, TransmitExecStatus, from->nodeCount);
|
|
||||||
COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount);
|
COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount);
|
||||||
COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount);
|
COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount);
|
||||||
|
|
||||||
|
|
|
@ -561,8 +561,6 @@ OutTaskExecution(OUTFUNC_ARGS)
|
||||||
WRITE_UINT_FIELD(taskId);
|
WRITE_UINT_FIELD(taskId);
|
||||||
WRITE_UINT_FIELD(nodeCount);
|
WRITE_UINT_FIELD(nodeCount);
|
||||||
|
|
||||||
WRITE_ENUM_ARRAY(taskStatusArray, node->nodeCount);
|
|
||||||
WRITE_ENUM_ARRAY(transmitStatusArray, node->nodeCount);
|
|
||||||
WRITE_INT_ARRAY(connectionIdArray, node->nodeCount);
|
WRITE_INT_ARRAY(connectionIdArray, node->nodeCount);
|
||||||
WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount);
|
WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount);
|
||||||
|
|
||||||
|
|
|
@ -18,24 +18,12 @@
|
||||||
#include "distributed/task_tracker.h"
|
#include "distributed/task_tracker.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
|
||||||
|
|
||||||
#define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */
|
|
||||||
#define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */
|
|
||||||
#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */
|
|
||||||
|
|
||||||
/* copy out query results */
|
/* copy out query results */
|
||||||
#define EXECUTE_SQL_TASK_TO_FILE_BINARY \
|
#define EXECUTE_SQL_TASK_TO_FILE_BINARY \
|
||||||
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)"
|
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)"
|
||||||
#define EXECUTE_SQL_TASK_TO_FILE_TEXT \
|
#define EXECUTE_SQL_TASK_TO_FILE_TEXT \
|
||||||
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)"
|
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)"
|
||||||
|
|
||||||
/* Task tracker executor related defines */
|
|
||||||
#define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \
|
|
||||||
("UINT64_FORMAT ", %u, %s);"
|
|
||||||
#define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u);"
|
|
||||||
#define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")"
|
|
||||||
#define JOB_CLEANUP_TASK_ID INT_MAX
|
|
||||||
|
|
||||||
/* Adaptive executor repartioning related defines */
|
/* Adaptive executor repartioning related defines */
|
||||||
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);"
|
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);"
|
||||||
#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \
|
#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \
|
||||||
|
@ -43,44 +31,6 @@
|
||||||
");"
|
");"
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration to track one task's execution status */
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
/* used for task tracker executor */
|
|
||||||
EXEC_TASK_INVALID_FIRST = 0,
|
|
||||||
EXEC_TASK_DONE = 1,
|
|
||||||
EXEC_TASK_UNASSIGNED = 2,
|
|
||||||
EXEC_TASK_QUEUED = 3,
|
|
||||||
EXEC_TASK_TRACKER_RETRY = 4,
|
|
||||||
EXEC_TASK_TRACKER_FAILED = 5,
|
|
||||||
EXEC_SOURCE_TASK_TRACKER_RETRY = 6,
|
|
||||||
EXEC_SOURCE_TASK_TRACKER_FAILED = 7,
|
|
||||||
} TaskExecStatus;
|
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration to track file transmits to the master node */
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
EXEC_TRANSMIT_INVALID_FIRST = 0,
|
|
||||||
EXEC_TRANSMIT_UNASSIGNED = 1,
|
|
||||||
EXEC_TRANSMIT_QUEUED = 2,
|
|
||||||
EXEC_TRANSMIT_COPYING = 3,
|
|
||||||
EXEC_TRANSMIT_TRACKER_RETRY = 4,
|
|
||||||
EXEC_TRANSMIT_TRACKER_FAILED = 5,
|
|
||||||
EXEC_TRANSMIT_DONE = 6
|
|
||||||
} TransmitExecStatus;
|
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration to track a task tracker's connection status */
|
|
||||||
typedef enum
|
|
||||||
{
|
|
||||||
TRACKER_STATUS_INVALID_FIRST = 0,
|
|
||||||
TRACKER_CONNECT_START = 1,
|
|
||||||
TRACKER_CONNECT_POLL = 2,
|
|
||||||
TRACKER_CONNECTED = 3,
|
|
||||||
TRACKER_CONNECTION_FAILED = 4
|
|
||||||
} TrackerStatus;
|
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration that represents distributed executor types */
|
/* Enumeration that represents distributed executor types */
|
||||||
typedef enum
|
typedef enum
|
||||||
|
@ -114,8 +64,6 @@ struct TaskExecution
|
||||||
uint64 jobId;
|
uint64 jobId;
|
||||||
uint32 taskId;
|
uint32 taskId;
|
||||||
|
|
||||||
TaskExecStatus *taskStatusArray;
|
|
||||||
TransmitExecStatus *transmitStatusArray;
|
|
||||||
int32 *connectionIdArray;
|
int32 *connectionIdArray;
|
||||||
int32 *fileDescriptorArray;
|
int32 *fileDescriptorArray;
|
||||||
uint32 nodeCount;
|
uint32 nodeCount;
|
||||||
|
@ -134,18 +82,10 @@ extern bool BinaryMasterCopyFormat;
|
||||||
extern int MultiTaskQueryLogLevel;
|
extern int MultiTaskQueryLogLevel;
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations for distributed execution */
|
|
||||||
extern void MultiTaskTrackerExecute(Job *job);
|
|
||||||
|
|
||||||
/* Function declarations common to more than one executor */
|
/* Function declarations common to more than one executor */
|
||||||
extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
|
extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
|
||||||
extern void RemoveJobDirectory(uint64 jobId);
|
extern void RemoveJobDirectory(uint64 jobId);
|
||||||
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);
|
|
||||||
extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats);
|
extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats);
|
||||||
extern void CleanupTaskExecution(TaskExecution *taskExecution);
|
|
||||||
extern void ErrorSizeLimitIsExceeded(void);
|
extern void ErrorSizeLimitIsExceeded(void);
|
||||||
extern bool TaskExecutionFailed(TaskExecution *taskExecution);
|
|
||||||
extern void AdjustStateForFailure(TaskExecution *taskExecution);
|
|
||||||
extern int MaxMasterConnectionCount(void);
|
|
||||||
|
|
||||||
#endif /* MULTI_SERVER_EXECUTOR_H */
|
#endif /* MULTI_SERVER_EXECUTOR_H */
|
||||||
|
|
Loading…
Reference in New Issue