diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 113f6e416..7c3dcb579 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -692,6 +692,19 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_task_string_size", + gettext_noop("Sets the maximum size (in bytes) of a worker task call string."), + gettext_noop("Active worker tasks' are tracked in a shared hash table " + "on the master node. This configuration value limits the " + "maximum size of an individual worker task, and " + "affects the size of pre-allocated shared memory."), + &MaxTaskStringSize, + 12288, 8192, 65536, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 3b94dfe0f..d88d82eaf 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -53,6 +53,7 @@ int TaskTrackerDelay = 200; /* process sleep interval in millisecs */ int MaxRunningTasksPerNode = 16; /* max number of running tasks */ int MaxTrackedTasksPerNode = 1024; /* max number of tracked tasks */ +int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */ WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; @@ -390,7 +391,7 @@ TrackerCleanupJobSchemas(void) cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME; cleanupTask->taskStatus = TASK_ASSIGNED; - strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE); + strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, MaxTaskStringSize); strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN); /* zero out all other fields */ @@ -532,7 +533,7 @@ TaskTrackerShmemSize(void) size = add_size(size, sizeof(WorkerTasksSharedStateData)); - hashSize = hash_estimate_size(MaxTrackedTasksPerNode, sizeof(WorkerTask)); + hashSize = hash_estimate_size(MaxTrackedTasksPerNode, WORKER_TASK_SIZE); size = add_size(size, hashSize); return size; @@ -559,7 +560,7 @@ TaskTrackerShmemInit(void) */ memset(&info, 0, sizeof(info)); info.keysize = sizeof(uint64) + sizeof(uint32); - info.entrysize = sizeof(WorkerTask); + info.entrysize = WORKER_TASK_SIZE; info.hash = tag_hash; hashFlags = (HASH_ELEM | HASH_FUNCTION); @@ -662,9 +663,10 @@ SchedulableTaskList(HTAB *WorkerTasksHash) for (queueIndex = 0; queueIndex < tasksToScheduleCount; queueIndex++) { - WorkerTask *schedulableTask = (WorkerTask *) palloc0(sizeof(WorkerTask)); - schedulableTask->jobId = schedulableTaskQueue[queueIndex].jobId; - schedulableTask->taskId = schedulableTaskQueue[queueIndex].taskId; + WorkerTask *schedulableTask = (WorkerTask *) palloc0(WORKER_TASK_SIZE); + WorkerTask *queuedTask = WORKER_TASK_AT(schedulableTaskQueue, queueIndex); + schedulableTask->jobId = queuedTask->jobId; + schedulableTask->taskId = queuedTask->taskId; schedulableTaskList = lappend(schedulableTaskList, schedulableTask); } @@ -698,7 +700,7 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) } /* allocate an array of tasks for our priority queue */ - priorityQueue = (WorkerTask *) palloc0(sizeof(WorkerTask) * queueSize); + priorityQueue = (WorkerTask *) palloc0(WORKER_TASK_SIZE * queueSize); /* copy tasks in the shared hash to the priority queue */ hash_seq_init(&status, WorkerTasksHash); @@ -709,9 +711,11 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) if (SchedulableTask(currentTask)) { /* tasks in the priority queue only need the first three fields */ - priorityQueue[queueIndex].jobId = currentTask->jobId; - priorityQueue[queueIndex].taskId = currentTask->taskId; - priorityQueue[queueIndex].assignedAt = currentTask->assignedAt; + WorkerTask *queueTask = WORKER_TASK_AT(priorityQueue, queueIndex); + + queueTask->jobId = currentTask->jobId; + queueTask->taskId = currentTask->taskId; + queueTask->assignedAt = currentTask->assignedAt; queueIndex++; } @@ -720,7 +724,7 @@ SchedulableTaskPriorityQueue(HTAB *WorkerTasksHash) } /* now order elements in the queue according to our sorting criterion */ - qsort(priorityQueue, queueSize, sizeof(WorkerTask), CompareTasksByTime); + qsort(priorityQueue, queueSize, WORKER_TASK_SIZE, CompareTasksByTime); return priorityQueue; } diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 9a8d452d6..2d4daf686 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -80,10 +80,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) } /* check that we have enough space in our shared hash for this string */ - if (taskCallStringLength >= TASK_CALL_STRING_SIZE) + if (taskCallStringLength >= MaxTaskStringSize) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("task call string exceeds maximum assignable length"))); + errmsg("task string length (%d) exceeds maximum assignable " + "size (%d)", taskCallStringLength, MaxTaskStringSize), + errhint("Consider increasing citus.max_task_string_size."))); } /* @@ -333,7 +335,7 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString) /* enter the worker task into shared hash and initialize the task */ workerTask = WorkerTasksHashEnter(jobId, taskId); workerTask->assignedAt = assignmentTime; - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->taskStatus = TASK_ASSIGNED; workerTask->connectionId = INVALID_CONNECTION_ID; @@ -370,13 +372,13 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString) } else if (taskStatus == TASK_PERMANENTLY_FAILED) { - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->failureCount = 0; workerTask->taskStatus = TASK_ASSIGNED; } else { - strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); workerTask->failureCount = 0; } } diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 20aa0a9cf..225e2bdcb 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -72,6 +72,9 @@ typedef enum * master node, (b) state initialized by the protocol process at task assignment * time, and (c) state internal to the task tracker process that changes as the * task make progress. + * + * Since taskCallString is dynamically sized use WORKER_TASK_SIZE instead of + * sizeof(WorkerTask). Use WORKER_TASK_AT to reference an item in WorkerTask array. */ typedef struct WorkerTask { @@ -79,14 +82,18 @@ typedef struct WorkerTask uint32 taskId; /* task id; part of hash table key */ uint32 assignedAt; /* task assignment time in epoch seconds */ - char taskCallString[TASK_CALL_STRING_SIZE]; /* query or function call string */ TaskStatus taskStatus; /* task's current execution status */ char databaseName[NAMEDATALEN]; /* name to use for local backend connection */ char userName[NAMEDATALEN]; /* user to use for local backend connection */ int32 connectionId; /* connection id to local backend */ uint32 failureCount; /* number of task failures */ + char taskCallString[FLEXIBLE_ARRAY_MEMBER]; /* query or function call string */ } WorkerTask; +#define WORKER_TASK_SIZE (offsetof(WorkerTask, taskCallString) + MaxTaskStringSize) + +#define WORKER_TASK_AT(workerTasks, index) \ + ((WorkerTask *) (((char *) (workerTasks)) + (index) * WORKER_TASK_SIZE)) /* * WorkerTasksControlData contains task tracker state shared between @@ -112,6 +119,7 @@ typedef struct WorkerTasksSharedStateData extern int TaskTrackerDelay; extern int MaxTrackedTasksPerNode; extern int MaxRunningTasksPerNode; +extern int MaxTaskStringSize; /* State shared by the task tracker and task tracker protocol functions */ extern WorkerTasksSharedStateData *WorkerTasksSharedState;