From d267e0f9fa79ced1901ca1e4d6a07d9b0dcff15b Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Fri, 3 Nov 2017 15:00:07 -0700 Subject: [PATCH] EXEC_BACKEND: don't put pointers to shared hashes into shared memory Store pointers to shared hashes in process-local variables. Previously pointers to shared hashes were put into shared memory. This causes problems on EXEC_BACKEND because everybody calls execve and receives a brand new address space; the shared hash will be in a different place for every backend. (normally we call fork, which gives you a copy of the address space, so these pointers remain constant) --- src/backend/distributed/utils/maintenanced.c | 28 +++++++++---------- src/backend/distributed/worker/task_tracker.c | 21 +++++++------- .../worker/task_tracker_protocol.c | 5 ++-- src/include/distributed/task_tracker.h | 4 +-- 4 files changed, 27 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 80d4e351e..f2e1a2bd8 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -51,7 +51,7 @@ typedef struct MaintenanceDaemonControlData /* * Lock protecting the shared memory state. This is to be taken when * looking up (shared mode) or inserting (exclusive mode) per-database - * data in dbHash. + * data in MaintenanceDaemonDBHash. */ int trancheId; #if (PG_VERSION_NUM >= 100000) @@ -60,12 +60,6 @@ typedef struct MaintenanceDaemonControlData LWLockTranche lockTranche; #endif LWLock lock; - - /* - * Hash-table of workers, one entry for each database with citus - * activated. - */ - HTAB *dbHash; } MaintenanceDaemonControlData; @@ -90,6 +84,12 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; +/* + * Hash-table of workers, one entry for each database with citus + * activated. + */ +static HTAB *MaintenanceDaemonDBHash; + static volatile sig_atomic_t got_SIGHUP = false; static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); @@ -128,7 +128,7 @@ InitializeMaintenanceDaemonBackend(void) LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, + dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &MyDatabaseId, HASH_ENTER_NULL, &found); @@ -225,8 +225,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED); myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonControl->dbHash, &databaseOid, - HASH_FIND, NULL); + hash_search(MaintenanceDaemonDBHash, &databaseOid, HASH_FIND, NULL); if (!myDbData) { /* @@ -530,10 +529,9 @@ MaintenanceDaemonShmemInit(void) hashInfo.hash = tag_hash; hashFlags = (HASH_ELEM | HASH_FUNCTION); - MaintenanceDaemonControl->dbHash = - ShmemInitHash("Maintenance Database Hash", - max_worker_processes, max_worker_processes, - &hashInfo, hashFlags); + MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash", + max_worker_processes, max_worker_processes, + &hashInfo, hashFlags); LWLockRelease(AddinShmemInitLock); @@ -622,7 +620,7 @@ StopMaintenanceDaemon(Oid databaseId) LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, + dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &databaseId, HASH_REMOVE, &found); if (found) { diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 56ea1b31f..48a6ec6c5 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -56,6 +56,9 @@ int MaxTrackedTasksPerNode = 1024; /* max number of tracked tasks */ int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */ WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */ +/* Hash table shared by the task tracker and task tracker protocol functions */ +HTAB *TaskTrackerTaskHash = NULL; /* shared memory */ + static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* Flags set by interrupt handlers for later service in the main loop */ @@ -246,17 +249,17 @@ TaskTrackerMain(Datum main_arg) ExitOnAnyError = true; /* Close open connections to local backends */ - TrackerCleanupConnections(WorkerTasksSharedState->taskHash); + TrackerCleanupConnections(TaskTrackerTaskHash); /* Add a sentinel task to the shared hash to mark shutdown */ - TrackerRegisterShutDown(WorkerTasksSharedState->taskHash); + TrackerRegisterShutDown(TaskTrackerTaskHash); /* Normal exit from the task tracker is here */ proc_exit(0); } /* Call the function that does the actual work */ - ManageWorkerTasksHash(WorkerTasksSharedState->taskHash); + ManageWorkerTasksHash(TaskTrackerTaskHash); /* Sleep for the configured time */ TrackerDelayLoop(); @@ -281,7 +284,7 @@ WorkerTasksHashEnter(uint64 jobId, uint32 taskId) searchTask.taskId = taskId; hashKey = (void *) &searchTask; - workerTask = (WorkerTask *) hash_search(WorkerTasksSharedState->taskHash, hashKey, + workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey, HASH_ENTER_NULL, &handleFound); if (workerTask == NULL) { @@ -318,7 +321,7 @@ WorkerTasksHashFind(uint64 jobId, uint32 taskId) searchTask.taskId = taskId; hashKey = (void *) &searchTask; - workerTask = (WorkerTask *) hash_search(WorkerTasksSharedState->taskHash, hashKey, + workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey, HASH_FIND, NULL); return workerTask; @@ -601,14 +604,12 @@ TaskTrackerShmemInit(void) } /* allocate hash table */ - WorkerTasksSharedState->taskHash = - ShmemInitHash("Worker Task Hash", - initTableSize, maxTableSize, - &info, hashFlags); + TaskTrackerTaskHash = ShmemInitHash("Worker Task Hash", initTableSize, maxTableSize, + &info, hashFlags); LWLockRelease(AddinShmemInitLock); - Assert(WorkerTasksSharedState->taskHash != NULL); + Assert(TaskTrackerTaskHash != NULL); Assert(WorkerTasksSharedState->taskHashTrancheId != 0); if (prev_shmem_startup_hook != NULL) diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 987944303..a739cbd1f 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -189,7 +189,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) */ LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - hash_seq_init(&status, WorkerTasksSharedState->taskHash); + hash_seq_init(&status, TaskTrackerTaskHash); currentTask = (WorkerTask *) hash_seq_search(&status); while (currentTask != NULL) @@ -415,8 +415,7 @@ CleanupTask(WorkerTask *workerTask) } /* remove the task from the shared hash */ - taskRemoved = hash_search(WorkerTasksSharedState->taskHash, hashKey, HASH_REMOVE, - NULL); + taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE, NULL); if (taskRemoved == NULL) { ereport(FATAL, (errmsg("worker task hash corrupted"))); diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 225e2bdcb..75c8f890a 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -101,9 +101,6 @@ typedef struct WorkerTask */ typedef struct WorkerTasksSharedStateData { - /* Hash table shared by the task tracker and task tracker protocol functions */ - HTAB *taskHash; - /* Lock protecting workerNodesHash */ int taskHashTrancheId; #if (PG_VERSION_NUM >= 100000) @@ -123,6 +120,7 @@ extern int MaxTaskStringSize; /* State shared by the task tracker and task tracker protocol functions */ extern WorkerTasksSharedStateData *WorkerTasksSharedState; +extern HTAB *TaskTrackerTaskHash; /* Entry point */ extern void TaskTrackerMain(Datum main_arg);