mirror of https://github.com/citusdata/citus.git
Move task tracker lwlocks into their own tranche.
RequestAddinLWLocks()/LWLockAssign() are gone in 9.6. Luckily all citus supported postgres versions support tranches, so use those.pull/824/head
parent
d52ebffce5
commit
77efe7fcd4
|
@ -99,7 +99,6 @@ TaskTrackerRegister(void)
|
||||||
|
|
||||||
/* organize and register initialization of required shared memory */
|
/* organize and register initialization of required shared memory */
|
||||||
RequestAddinShmemSpace(TaskTrackerShmemSize());
|
RequestAddinShmemSpace(TaskTrackerShmemSize());
|
||||||
RequestAddinLWLocks(1);
|
|
||||||
|
|
||||||
prev_shmem_startup_hook = shmem_startup_hook;
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
shmem_startup_hook = TaskTrackerShmemInit;
|
shmem_startup_hook = TaskTrackerShmemInit;
|
||||||
|
@ -376,7 +375,7 @@ TrackerCleanupJobSchemas(void)
|
||||||
const uint64 jobId = RESERVED_JOB_ID;
|
const uint64 jobId = RESERVED_JOB_ID;
|
||||||
uint32 taskIndex = 1;
|
uint32 taskIndex = 1;
|
||||||
|
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
foreach(databaseNameCell, databaseNameList)
|
foreach(databaseNameCell, databaseNameList)
|
||||||
{
|
{
|
||||||
|
@ -409,7 +408,7 @@ TrackerCleanupJobSchemas(void)
|
||||||
taskIndex++;
|
taskIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
|
|
||||||
if (databaseNameList != NIL)
|
if (databaseNameList != NIL)
|
||||||
{
|
{
|
||||||
|
@ -457,13 +456,13 @@ TrackerRegisterShutDown(HTAB *WorkerTasksHash)
|
||||||
uint32 taskId = SHUTDOWN_MARKER_TASK_ID;
|
uint32 taskId = SHUTDOWN_MARKER_TASK_ID;
|
||||||
WorkerTask *shutdownMarkerTask = NULL;
|
WorkerTask *shutdownMarkerTask = NULL;
|
||||||
|
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
shutdownMarkerTask = WorkerTasksHashEnter(jobId, taskId);
|
shutdownMarkerTask = WorkerTasksHashEnter(jobId, taskId);
|
||||||
shutdownMarkerTask->taskStatus = TASK_SUCCEEDED;
|
shutdownMarkerTask->taskStatus = TASK_SUCCEEDED;
|
||||||
shutdownMarkerTask->connectionId = INVALID_CONNECTION_ID;
|
shutdownMarkerTask->connectionId = INVALID_CONNECTION_ID;
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -587,8 +586,16 @@ TaskTrackerShmemInit(void)
|
||||||
|
|
||||||
if (!alreadyInitialized)
|
if (!alreadyInitialized)
|
||||||
{
|
{
|
||||||
/* allocate lwlock protecting the task tracker hash table */
|
/* initialize lwlock protecting the task tracker hash table */
|
||||||
WorkerTasksSharedState->taskHashLock = LWLockAssign();
|
LWLockTranche *tranche = &WorkerTasksSharedState->taskHashLockTranche;
|
||||||
|
|
||||||
|
WorkerTasksSharedState->taskHashTrancheId = LWLockNewTrancheId();
|
||||||
|
tranche->array_base = &WorkerTasksSharedState->taskHashLock;
|
||||||
|
tranche->array_stride = sizeof(LWLock);
|
||||||
|
tranche->name = "Worker Task Hash Tranche";
|
||||||
|
LWLockRegisterTranche(WorkerTasksSharedState->taskHashTrancheId, tranche);
|
||||||
|
LWLockInitialize(&WorkerTasksSharedState->taskHashLock,
|
||||||
|
WorkerTasksSharedState->taskHashTrancheId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* allocate hash table */
|
/* allocate hash table */
|
||||||
|
@ -600,7 +607,7 @@ TaskTrackerShmemInit(void)
|
||||||
LWLockRelease(AddinShmemInitLock);
|
LWLockRelease(AddinShmemInitLock);
|
||||||
|
|
||||||
Assert(WorkerTasksSharedState->taskHash != NULL);
|
Assert(WorkerTasksSharedState->taskHash != NULL);
|
||||||
Assert(WorkerTasksSharedState->taskHashLock != NULL);
|
Assert(WorkerTasksSharedState->taskHashTrancheId != 0);
|
||||||
|
|
||||||
if (prev_shmem_startup_hook != NULL)
|
if (prev_shmem_startup_hook != NULL)
|
||||||
{
|
{
|
||||||
|
@ -844,11 +851,11 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash)
|
||||||
WorkerTask *currentTask = NULL;
|
WorkerTask *currentTask = NULL;
|
||||||
|
|
||||||
/* ask the scheduler if we have new tasks to schedule */
|
/* ask the scheduler if we have new tasks to schedule */
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
||||||
schedulableTaskList = SchedulableTaskList(WorkerTasksHash);
|
schedulableTaskList = SchedulableTaskList(WorkerTasksHash);
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
|
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/* schedule new tasks if we have any */
|
/* schedule new tasks if we have any */
|
||||||
if (schedulableTaskList != NIL)
|
if (schedulableTaskList != NIL)
|
||||||
|
@ -878,7 +885,7 @@ ManageWorkerTasksHash(HTAB *WorkerTasksHash)
|
||||||
currentTask = (WorkerTask *) hash_seq_search(&status);
|
currentTask = (WorkerTask *) hash_seq_search(&status);
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
|
||||||
UnlockJobResource(jobId, AccessExclusiveLock);
|
UnlockJobResource(jobId, AccessExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/* check if we already have the task in our shared hash */
|
/* check if we already have the task in our shared hash */
|
||||||
workerTask = WorkerTasksHashFind(jobId, taskId);
|
workerTask = WorkerTasksHashFind(jobId, taskId);
|
||||||
|
@ -113,7 +113,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
|
||||||
UpdateTask(workerTask, taskCallString);
|
UpdateTask(workerTask, taskCallString);
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -132,7 +132,7 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
|
||||||
bool taskTrackerRunning = TaskTrackerRunning();
|
bool taskTrackerRunning = TaskTrackerRunning();
|
||||||
if (taskTrackerRunning)
|
if (taskTrackerRunning)
|
||||||
{
|
{
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
||||||
|
|
||||||
workerTask = WorkerTasksHashFind(jobId, taskId);
|
workerTask = WorkerTasksHashFind(jobId, taskId);
|
||||||
if (workerTask == NULL)
|
if (workerTask == NULL)
|
||||||
|
@ -144,7 +144,7 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
taskStatus = (uint32) workerTask->taskStatus;
|
taskStatus = (uint32) workerTask->taskStatus;
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -174,7 +174,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
* We first clean up any open connections, and remove tasks belonging to
|
* We first clean up any open connections, and remove tasks belonging to
|
||||||
* this job from the shared hash.
|
* this job from the shared hash.
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
hash_seq_init(&status, WorkerTasksSharedState->taskHash);
|
hash_seq_init(&status, WorkerTasksSharedState->taskHash);
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
currentTask = (WorkerTask *) hash_seq_search(&status);
|
currentTask = (WorkerTask *) hash_seq_search(&status);
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We then delete the job directory and schema, if they exist. This cleans
|
* We then delete the job directory and schema, if they exist. This cleans
|
||||||
|
@ -233,7 +233,7 @@ TaskTrackerRunning(void)
|
||||||
* marker task to the shared hash. We need to look up this marker task since
|
* marker task to the shared hash. We need to look up this marker task since
|
||||||
* the postmaster doesn't send a terminate signal to running backends.
|
* the postmaster doesn't send a terminate signal to running backends.
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
||||||
|
|
||||||
workerTask = WorkerTasksHashFind(RESERVED_JOB_ID, SHUTDOWN_MARKER_TASK_ID);
|
workerTask = WorkerTasksHashFind(RESERVED_JOB_ID, SHUTDOWN_MARKER_TASK_ID);
|
||||||
if (workerTask != NULL)
|
if (workerTask != NULL)
|
||||||
|
@ -241,7 +241,7 @@ TaskTrackerRunning(void)
|
||||||
taskTrackerRunning = false;
|
taskTrackerRunning = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(WorkerTasksSharedState->taskHashLock);
|
LWLockRelease(&WorkerTasksSharedState->taskHashLock);
|
||||||
|
|
||||||
return taskTrackerRunning;
|
return taskTrackerRunning;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,9 @@ typedef struct WorkerTasksSharedStateData
|
||||||
HTAB *taskHash;
|
HTAB *taskHash;
|
||||||
|
|
||||||
/* Lock protecting workerNodesHash */
|
/* Lock protecting workerNodesHash */
|
||||||
LWLock *taskHashLock;
|
int taskHashTrancheId;
|
||||||
|
LWLockTranche taskHashLockTranche;
|
||||||
|
LWLock taskHashLock;
|
||||||
} WorkerTasksSharedStateData;
|
} WorkerTasksSharedStateData;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue