From 3cb8d467b7223916cb722824de955e5cc06e2c5f Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Tue, 19 May 2020 15:56:13 +0300 Subject: [PATCH] remove unused methods from multi_server_executor --- .../executor/multi_server_executor.c | 131 ------------------ .../distributed/utils/citus_copyfuncs.c | 2 - .../distributed/utils/citus_outfuncs.c | 2 - .../distributed/multi_server_executor.h | 60 -------- 4 files changed, 195 deletions(-) diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 8c008d181..8da107eee 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -164,22 +164,6 @@ HasReplicatedDistributedTable(List *relationOids) return false; } - -/* - * MaxMasterConnectionCount returns the number of connections a master can open. - * A master cannot create more than a certain number of file descriptors (FDs). - * Every task requires 2 FDs, one file and one connection. Some FDs are taken by - * the VFD pool and there is currently no way to reclaim these before opening a - * connection. We therefore assume some FDs to be reserved for VFDs, based on - * observing a typical size of the pool on a Citus master. - */ -int -MaxMasterConnectionCount(void) -{ - return Max((max_files_per_process - RESERVED_FD_COUNT) / 2, 1); -} - - /* * RemoveJobDirectory gets automatically called at portal drop (end of query) or * at transaction abort. The function removes the job directory and releases the @@ -194,121 +178,6 @@ RemoveJobDirectory(uint64 jobId) ResourceOwnerForgetJobDirectory(CurrentResourceOwner, jobId); } - -/* - * InitTaskExecution creates a task execution structure for the given task, and - * initializes execution related fields. - */ -TaskExecution * -InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus) -{ - /* each task placement (assignment) corresponds to one worker node */ - uint32 nodeCount = list_length(task->taskPlacementList); - - TaskExecution *taskExecution = CitusMakeNode(TaskExecution); - - taskExecution->jobId = task->jobId; - taskExecution->taskId = task->taskId; - taskExecution->nodeCount = nodeCount; - taskExecution->currentNodeIndex = 0; - taskExecution->failureCount = 0; - - taskExecution->taskStatusArray = palloc0(nodeCount * sizeof(TaskExecStatus)); - taskExecution->transmitStatusArray = palloc0(nodeCount * sizeof(TransmitExecStatus)); - taskExecution->connectionIdArray = palloc0(nodeCount * sizeof(int32)); - taskExecution->fileDescriptorArray = palloc0(nodeCount * sizeof(int32)); - - for (uint32 nodeIndex = 0; nodeIndex < nodeCount; nodeIndex++) - { - taskExecution->taskStatusArray[nodeIndex] = initialTaskExecStatus; - taskExecution->transmitStatusArray[nodeIndex] = EXEC_TRANSMIT_UNASSIGNED; - taskExecution->connectionIdArray[nodeIndex] = INVALID_CONNECTION_ID; - taskExecution->fileDescriptorArray[nodeIndex] = -1; - } - - return taskExecution; -} - - -/* - * CleanupTaskExecution iterates over all connections and file descriptors for - * the given task execution. The function first closes all open connections and - * file descriptors, and then frees memory allocated for the task execution. - */ -void -CleanupTaskExecution(TaskExecution *taskExecution) -{ - for (uint32 nodeIndex = 0; nodeIndex < taskExecution->nodeCount; nodeIndex++) - { - int32 connectionId = taskExecution->connectionIdArray[nodeIndex]; - int32 fileDescriptor = taskExecution->fileDescriptorArray[nodeIndex]; - - /* close open connection */ - if (connectionId != INVALID_CONNECTION_ID) - { - MultiClientDisconnect(connectionId); - taskExecution->connectionIdArray[nodeIndex] = INVALID_CONNECTION_ID; - } - - /* close open file */ - if (fileDescriptor >= 0) - { - int closed = close(fileDescriptor); - taskExecution->fileDescriptorArray[nodeIndex] = -1; - - if (closed < 0) - { - ereport(WARNING, (errcode_for_file_access(), - errmsg("could not close copy file: %m"))); - } - } - } - - /* deallocate memory and reset all fields */ - pfree(taskExecution->taskStatusArray); - pfree(taskExecution->connectionIdArray); - pfree(taskExecution->fileDescriptorArray); - pfree(taskExecution); -} - - -/* Determines if the given task exceeded its failure threshold. */ -bool -TaskExecutionFailed(TaskExecution *taskExecution) -{ - if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES) - { - return true; - } - - return false; -} - - -/* - * AdjustStateForFailure increments the failure count for given task execution. - * The function also determines the next worker node that should be contacted - * for remote execution. - */ -void -AdjustStateForFailure(TaskExecution *taskExecution) -{ - int maxNodeIndex = taskExecution->nodeCount - 1; - Assert(maxNodeIndex >= 0); - - if (taskExecution->currentNodeIndex < maxNodeIndex) - { - taskExecution->currentNodeIndex++; /* try next worker node */ - } - else - { - taskExecution->currentNodeIndex = 0; /* go back to the first worker node */ - } - - taskExecution->failureCount++; /* record failure */ -} - - /* * CheckIfSizeLimitIsExceeded checks if the limit is exceeded by intermediate * results, if there is any. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 1d2819309..2b61866f5 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -350,8 +350,6 @@ CopyNodeTaskExecution(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(nodeCount); - COPY_SCALAR_ARRAY(taskStatusArray, TaskExecStatus, from->nodeCount); - COPY_SCALAR_ARRAY(transmitStatusArray, TransmitExecStatus, from->nodeCount); COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount); COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index a6515f599..3567501c5 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -561,8 +561,6 @@ OutTaskExecution(OUTFUNC_ARGS) WRITE_UINT_FIELD(taskId); WRITE_UINT_FIELD(nodeCount); - WRITE_ENUM_ARRAY(taskStatusArray, node->nodeCount); - WRITE_ENUM_ARRAY(transmitStatusArray, node->nodeCount); WRITE_INT_ARRAY(connectionIdArray, node->nodeCount); WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount); diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index b18a49e00..2b4195db8 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -18,24 +18,12 @@ #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" - -#define MAX_TASK_EXECUTION_FAILURES 3 /* allowed failure count for one task */ -#define MAX_TRACKER_FAILURE_COUNT 3 /* allowed failure count for one tracker */ -#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */ - /* copy out query results */ #define EXECUTE_SQL_TASK_TO_FILE_BINARY \ "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)" #define EXECUTE_SQL_TASK_TO_FILE_TEXT \ "SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, false)" -/* Task tracker executor related defines */ -#define TASK_ASSIGNMENT_QUERY "SELECT task_tracker_assign_task \ - ("UINT64_FORMAT ", %u, %s);" -#define TASK_STATUS_QUERY "SELECT task_tracker_task_status("UINT64_FORMAT ", %u);" -#define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")" -#define JOB_CLEANUP_TASK_ID INT_MAX - /* Adaptive executor repartioning related defines */ #define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);" #define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \ @@ -43,44 +31,6 @@ ");" -/* Enumeration to track one task's execution status */ -typedef enum -{ - /* used for task tracker executor */ - EXEC_TASK_INVALID_FIRST = 0, - EXEC_TASK_DONE = 1, - EXEC_TASK_UNASSIGNED = 2, - EXEC_TASK_QUEUED = 3, - EXEC_TASK_TRACKER_RETRY = 4, - EXEC_TASK_TRACKER_FAILED = 5, - EXEC_SOURCE_TASK_TRACKER_RETRY = 6, - EXEC_SOURCE_TASK_TRACKER_FAILED = 7, -} TaskExecStatus; - - -/* Enumeration to track file transmits to the master node */ -typedef enum -{ - EXEC_TRANSMIT_INVALID_FIRST = 0, - EXEC_TRANSMIT_UNASSIGNED = 1, - EXEC_TRANSMIT_QUEUED = 2, - EXEC_TRANSMIT_COPYING = 3, - EXEC_TRANSMIT_TRACKER_RETRY = 4, - EXEC_TRANSMIT_TRACKER_FAILED = 5, - EXEC_TRANSMIT_DONE = 6 -} TransmitExecStatus; - - -/* Enumeration to track a task tracker's connection status */ -typedef enum -{ - TRACKER_STATUS_INVALID_FIRST = 0, - TRACKER_CONNECT_START = 1, - TRACKER_CONNECT_POLL = 2, - TRACKER_CONNECTED = 3, - TRACKER_CONNECTION_FAILED = 4 -} TrackerStatus; - /* Enumeration that represents distributed executor types */ typedef enum @@ -114,8 +64,6 @@ struct TaskExecution uint64 jobId; uint32 taskId; - TaskExecStatus *taskStatusArray; - TransmitExecStatus *transmitStatusArray; int32 *connectionIdArray; int32 *fileDescriptorArray; uint32 nodeCount; @@ -134,18 +82,10 @@ extern bool BinaryMasterCopyFormat; extern int MultiTaskQueryLogLevel; -/* Function declarations for distributed execution */ -extern void MultiTaskTrackerExecute(Job *job); - /* Function declarations common to more than one executor */ extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan); extern void RemoveJobDirectory(uint64 jobId); -extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus); extern bool CheckIfSizeLimitIsExceeded(DistributedExecutionStats *executionStats); -extern void CleanupTaskExecution(TaskExecution *taskExecution); extern void ErrorSizeLimitIsExceeded(void); -extern bool TaskExecutionFailed(TaskExecution *taskExecution); -extern void AdjustStateForFailure(TaskExecution *taskExecution); -extern int MaxMasterConnectionCount(void); #endif /* MULTI_SERVER_EXECUTOR_H */