diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index f0ee43dc4..3f8b994f2 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -99,7 +99,6 @@ TaskTrackerRegister(void) /* organize and register initialization of required shared memory */ RequestAddinShmemSpace(TaskTrackerShmemSize()); - RequestAddinLWLocks(1); prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = TaskTrackerShmemInit; @@ -376,7 +375,7 @@ TrackerCleanupJobSchemas(void) const uint64 jobId = RESERVED_JOB_ID; uint32 taskIndex = 1; - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); foreach(databaseNameCell, databaseNameList) { @@ -409,7 +408,7 @@ TrackerCleanupJobSchemas(void) taskIndex++; } - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); if (databaseNameList != NIL) { @@ -457,13 +456,13 @@ TrackerRegisterShutDown(HTAB *WorkerTasksHash) uint32 taskId = SHUTDOWN_MARKER_TASK_ID; WorkerTask *shutdownMarkerTask = NULL; - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); shutdownMarkerTask = WorkerTasksHashEnter(jobId, taskId); shutdownMarkerTask->taskStatus = TASK_SUCCEEDED; shutdownMarkerTask->connectionId = INVALID_CONNECTION_ID; - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); } @@ -587,8 +586,16 @@ TaskTrackerShmemInit(void) if (!alreadyInitialized) { - /* allocate lwlock protecting the task tracker hash table */ - WorkerTasksSharedState->taskHashLock = LWLockAssign(); + /* initialize lwlock protecting the task tracker hash table */ + LWLockTranche *tranche = &WorkerTasksSharedState->taskHashLockTranche; + + WorkerTasksSharedState->taskHashTrancheId = LWLockNewTrancheId(); + tranche->array_base = &WorkerTasksSharedState->taskHashLock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Worker Task Hash Tranche"; + LWLockRegisterTranche(WorkerTasksSharedState->taskHashTrancheId, tranche); + LWLockInitialize(&WorkerTasksSharedState->taskHashLock, + WorkerTasksSharedState->taskHashTrancheId); } /* allocate hash table */ @@ -600,7 +607,7 @@ TaskTrackerShmemInit(void) LWLockRelease(AddinShmemInitLock); Assert(WorkerTasksSharedState->taskHash != NULL); - Assert(WorkerTasksSharedState->taskHashLock != NULL); + Assert(WorkerTasksSharedState->taskHashTrancheId != 0); if (prev_shmem_startup_hook != NULL) { @@ -844,11 +851,11 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash) WorkerTask *currentTask = NULL; /* ask the scheduler if we have new tasks to schedule */ - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); schedulableTaskList = SchedulableTaskList(WorkerTasksHash); - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); /* schedule new tasks if we have any */ if (schedulableTaskList != NIL) @@ -878,7 +885,7 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash) currentTask = (WorkerTask *) hash_seq_search(&status); } - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); } diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index d271bfefb..0718487f7 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -100,7 +100,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) UnlockJobResource(jobId, AccessExclusiveLock); } - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); /* check if we already have the task in our shared hash */ workerTask = WorkerTasksHashFind(jobId, taskId); @@ -113,7 +113,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) UpdateTask(workerTask, taskCallString); } - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); PG_RETURN_VOID(); } @@ -132,7 +132,7 @@ task_tracker_task_status(PG_FUNCTION_ARGS) bool taskTrackerRunning = TaskTrackerRunning(); if (taskTrackerRunning) { - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); workerTask = WorkerTasksHashFind(jobId, taskId); if (workerTask == NULL) @@ -144,7 +144,7 @@ task_tracker_task_status(PG_FUNCTION_ARGS) taskStatus = (uint32) workerTask->taskStatus; - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); } else { @@ -174,7 +174,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) * We first clean up any open connections, and remove tasks belonging to * this job from the shared hash. */ - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); hash_seq_init(&status, WorkerTasksSharedState->taskHash); @@ -189,7 +189,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) currentTask = (WorkerTask *) hash_seq_search(&status); } - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); /* * We then delete the job directory and schema, if they exist. This cleans @@ -233,7 +233,7 @@ TaskTrackerRunning(void) * marker task to the shared hash. We need to look up this marker task since * the postmaster doesn't send a terminate signal to running backends. */ - LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED); + LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); workerTask = WorkerTasksHashFind(RESERVED_JOB_ID, SHUTDOWN_MARKER_TASK_ID); if (workerTask != NULL) @@ -241,7 +241,7 @@ TaskTrackerRunning(void) taskTrackerRunning = false; } - LWLockRelease(WorkerTasksSharedState->taskHashLock); + LWLockRelease(&WorkerTasksSharedState->taskHashLock); return taskTrackerRunning; } diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 81fa4fe43..756314b1c 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -98,7 +98,9 @@ typedef struct WorkerTasksSharedStateData HTAB *taskHash; /* Lock protecting workerNodesHash */ - LWLock *taskHashLock; + int taskHashTrancheId; + LWLockTranche taskHashLockTranche; + LWLock taskHashLock; } WorkerTasksSharedStateData;