From 26f020dc6e7f829a60bc81f7df79be0446615eff Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 27 Jul 2017 11:39:12 -0700 Subject: [PATCH] Make maxTaskStringSize configurable (#1501) maxTaskStringSize determines the size of worker query string. It was originally hard coded to a specific value. This has caused issues at some users. Since it determines initial shared memory allocation, we did not want to set it to an arbitrary higher number. Instead made it configurable. This commit introduces a new GUC variable max_task_string_size Changes in this variable requires restart to be in effect. --- src/backend/distributed/shared_library_init.c | 13 ++++++++++ src/backend/distributed/worker/task_tracker.c | 26 +++++++++++-------- .../worker/task_tracker_protocol.c | 12 +++++---- src/include/distributed/task_tracker.h | 10 ++++++- 4 files changed, 44 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 113f6e416..7c3dcb579 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -692,6 +692,19 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_task_string_size", + gettext_noop("Sets the maximum size (in bytes) of a worker task call string."), + gettext_noop("Active worker tasks' are tracked in a shared hash table " + "on the master node. This configuration value limits the " + "maximum size of an individual worker task, and " + "affects the size of pre-allocated shared memory."), + &MaxTaskStringSize, + 12288, 8192, 65536, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 3b94dfe0f..d88d82eaf 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -53,6 +53,7 @@ int TaskTrackerDelay = 200; /* process sleep interval in millisecs */ int MaxRunningTasksPerNode = 16; /* max number of running tasks */ int MaxTrackedTasksPerNode = 1024; /* max number of tracked tasks */ +int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */ WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; @@ -390,7 +391,7 @@ TrackerCleanupJobSchemas(void) cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME; cleanupTask->taskStatus = TASK_ASSIGNED; - strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE); + strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, MaxTaskStringSize); strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN); /* zero out all other fields */ @@ -532,7 +533,7 @@ TaskTrackerShmemSize(void) size = add_size(size, sizeof(WorkerTasksSharedStateData)); - hashSize = hash_estimate_size(MaxTrackedTasksPerNode, sizeof(WorkerTask)); + hashSize = hash_estimate_size(MaxTrackedTasksPerNode, WORKER_TASK_SIZE); size = add_size(size, hashSize); return size; @@ -559,7 +560,7 @@ TaskTrackerShmemInit(void) */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(uint64) + sizeof(uint32); - info.entrysize = sizeof(WorkerTask); + info.entrysize = WORKER_TASK_SIZE; info.hash = tag_hash; hashFlags = (HASH_ELEM | HASH_FUNCTION); @@ -662,9 +663,10 @@ SchedulableTaskList(HTAB *WorkerTasksHash) for (queueIndex = 0; queueIndex < tasksToScheduleCount; queueIndex++) { - WorkerTask *schedulableTask = (WorkerTask *) palloc0(sizeof(WorkerTask)); - schedulableTask->jobId = schedulableTaskQueue[queueIndex].jobId; - schedulableTask->taskId = schedulableTaskQueue[queueIndex].taskId; + WorkerTask *schedulableTask = (WorkerTask *) palloc0(WORKER_TASK_SIZE); + WorkerTask *queuedTask = WORKER_TASK_AT(schedulableTaskQueue, queueIndex); + schedulableTask->jobId = queuedTask->jobId; + schedulableTask->taskId = queuedTask->taskId; schedulableTaskList = lappend(schedulableTaskList, schedulableTask); } @@ -698,7 +700,7 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) } /* allocate an array of tasks for our priority queue */ - priorityQueue = (WorkerTask *) palloc0(sizeof(WorkerTask) * queueSize); + priorityQueue = (WorkerTask *) palloc0(WORKER_TASK_SIZE * queueSize); /* copy tasks in the shared hash to the priority queue */ hash_seq_init(&status, WorkerTasksHash); @@ -709,9 +711,11 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) if (SchedulableTask(currentTask)) { /* tasks in the priority queue only need the first three fields */ - priorityQueue[queueIndex].jobId = currentTask->jobId; - priorityQueue[queueIndex].taskId = currentTask->taskId; - priorityQueue[queueIndex].assignedAt = currentTask->assignedAt; + WorkerTask *queueTask = WORKER_TASK_AT(priorityQueue, queueIndex); + + queueTask->jobId = currentTask->jobId; + queueTask->taskId = currentTask->taskId; + queueTask->assignedAt = currentTask->assignedAt; queueIndex++; } @@ -720,7 +724,7 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) } /* now order elements in the queue according to our sorting criterion */ - qsort(priorityQueue, queueSize, sizeof(WorkerTask), CompareTasksByTime); + qsort(priorityQueue, queueSize, WORKER_TASK_SIZE, CompareTasksByTime); return priorityQueue; } diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 9a8d452d6..2d4daf686 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -80,10 +80,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) } /* check that we have enough space in our shared hash for this string */ - if (taskCallStringLength >= TASK_CALL_STRING_SIZE) + if (taskCallStringLength >= MaxTaskStringSize) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("task call string exceeds maximum assignable length"))); + errmsg("task string length (%d) exceeds maximum assignable " + "size (%d)", taskCallStringLength, MaxTaskStringSize), + errhint("Consider increasing citus.max_task_string_size."))); } /* @@ -333,7 +335,7 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString) /* enter the worker task into shared hash and initialize the task */ workerTask = WorkerTasksHashEnter(jobId, taskId); workerTask->assignedAt = assignmentTime; - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->taskStatus = TASK_ASSIGNED; workerTask->connectionId = INVALID_CONNECTION_ID; @@ -370,13 +372,13 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString) } else if (taskStatus == TASK_PERMANENTLY_FAILED) { - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->failureCount = 0; workerTask->taskStatus = TASK_ASSIGNED; } else { - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->failureCount = 0; } } diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 20aa0a9cf..225e2bdcb 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -72,6 +72,9 @@ typedef enum * master node, (b) state initialized by the protocol process at task assignment * time, and (c) state internal to the task tracker process that changes as the * task make progress. + * + * Since taskCallString is dynamically sized use WORKER_TASK_SIZE instead of + * sizeof(WorkerTask). Use WORKER_TASK_AT to reference an item in WorkerTask array. */ typedef struct WorkerTask { @@ -79,14 +82,18 @@ typedef struct WorkerTask uint32 taskId; /* task id; part of hash table key */ uint32 assignedAt; /* task assignment time in epoch seconds */ - char taskCallString[TASK_CALL_STRING_SIZE]; /* query or function call string */ TaskStatus taskStatus; /* task's current execution status */ char databaseName[NAMEDATALEN]; /* name to use for local backend connection */ char userName[NAMEDATALEN]; /* user to use for local backend connection */ int32 connectionId; /* connection id to local backend */ uint32 failureCount; /* number of task failures */ + char taskCallString[FLEXIBLE_ARRAY_MEMBER]; /* query or function call string */ } WorkerTask; +#define WORKER_TASK_SIZE (offsetof(WorkerTask, taskCallString) + MaxTaskStringSize) + +#define WORKER_TASK_AT(workerTasks, index) \ + ((WorkerTask *) (((char *) (workerTasks)) + (index) * WORKER_TASK_SIZE)) /* * WorkerTasksControlData contains task tracker state shared between @@ -112,6 +119,7 @@ typedef struct WorkerTasksSharedStateData extern int TaskTrackerDelay; extern int MaxTrackedTasksPerNode; extern int MaxRunningTasksPerNode; +extern int MaxTaskStringSize; /* State shared by the task tracker and task tracker protocol functions */ extern WorkerTasksSharedStateData *WorkerTasksSharedState;