EXEC_BACKEND: don't put pointers to shared hashes into shared memory

Store pointers to shared hashes in process-local variables. Previously
pointers to shared hashes were put into shared memory. This causes
problems on EXEC_BACKEND because everybody calls execve and receives a
brand new address space; the shared hash will be in a different place
for every backend. (normally we call fork, which gives you a copy of the
address space, so these pointers remain constant)
pull/1816/head
Brian Cloutier 2017-11-03 15:00:07 -07:00 committed by Brian Cloutier
parent 30a2365d81
commit d267e0f9fa
4 changed files with 27 additions and 31 deletions

View File

@ -51,7 +51,7 @@ typedef struct MaintenanceDaemonControlData
/* /*
* Lock protecting the shared memory state. This is to be taken when * Lock protecting the shared memory state. This is to be taken when
* looking up (shared mode) or inserting (exclusive mode) per-database * looking up (shared mode) or inserting (exclusive mode) per-database
* data in dbHash. * data in MaintenanceDaemonDBHash.
*/ */
int trancheId; int trancheId;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
@ -60,12 +60,6 @@ typedef struct MaintenanceDaemonControlData
LWLockTranche lockTranche; LWLockTranche lockTranche;
#endif #endif
LWLock lock; LWLock lock;
/*
* Hash-table of workers, one entry for each database with citus
* activated.
*/
HTAB *dbHash;
} MaintenanceDaemonControlData; } MaintenanceDaemonControlData;
@ -90,6 +84,12 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
/*
* Hash-table of workers, one entry for each database with citus
* activated.
*/
static HTAB *MaintenanceDaemonDBHash;
static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGHUP = false;
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
@ -128,7 +128,7 @@ InitializeMaintenanceDaemonBackend(void)
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash,
&MyDatabaseId, &MyDatabaseId,
HASH_ENTER_NULL, &found); HASH_ENTER_NULL, &found);
@ -225,8 +225,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED);
myDbData = (MaintenanceDaemonDBData *) myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonControl->dbHash, &databaseOid, hash_search(MaintenanceDaemonDBHash, &databaseOid, HASH_FIND, NULL);
HASH_FIND, NULL);
if (!myDbData) if (!myDbData)
{ {
/* /*
@ -530,10 +529,9 @@ MaintenanceDaemonShmemInit(void)
hashInfo.hash = tag_hash; hashInfo.hash = tag_hash;
hashFlags = (HASH_ELEM | HASH_FUNCTION); hashFlags = (HASH_ELEM | HASH_FUNCTION);
MaintenanceDaemonControl->dbHash = MaintenanceDaemonDBHash = ShmemInitHash("Maintenance Database Hash",
ShmemInitHash("Maintenance Database Hash", max_worker_processes, max_worker_processes,
max_worker_processes, max_worker_processes, &hashInfo, hashFlags);
&hashInfo, hashFlags);
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
@ -622,7 +620,7 @@ StopMaintenanceDaemon(Oid databaseId)
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash, dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash,
&databaseId, HASH_REMOVE, &found); &databaseId, HASH_REMOVE, &found);
if (found) if (found)
{ {

View File

@ -56,6 +56,9 @@ int MaxTrackedTasksPerNode = 1024; /* max number of tracked tasks */
int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */ int MaxTaskStringSize = 12288; /* max size of a worker task call string in bytes */
WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */ WorkerTasksSharedStateData *WorkerTasksSharedState; /* shared memory state */
/* Hash table shared by the task tracker and task tracker protocol functions */
HTAB *TaskTrackerTaskHash = NULL; /* shared memory */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* Flags set by interrupt handlers for later service in the main loop */ /* Flags set by interrupt handlers for later service in the main loop */
@ -246,17 +249,17 @@ TaskTrackerMain(Datum main_arg)
ExitOnAnyError = true; ExitOnAnyError = true;
/* Close open connections to local backends */ /* Close open connections to local backends */
TrackerCleanupConnections(WorkerTasksSharedState->taskHash); TrackerCleanupConnections(TaskTrackerTaskHash);
/* Add a sentinel task to the shared hash to mark shutdown */ /* Add a sentinel task to the shared hash to mark shutdown */
TrackerRegisterShutDown(WorkerTasksSharedState->taskHash); TrackerRegisterShutDown(TaskTrackerTaskHash);
/* Normal exit from the task tracker is here */ /* Normal exit from the task tracker is here */
proc_exit(0); proc_exit(0);
} }
/* Call the function that does the actual work */ /* Call the function that does the actual work */
ManageWorkerTasksHash(WorkerTasksSharedState->taskHash); ManageWorkerTasksHash(TaskTrackerTaskHash);
/* Sleep for the configured time */ /* Sleep for the configured time */
TrackerDelayLoop(); TrackerDelayLoop();
@ -281,7 +284,7 @@ WorkerTasksHashEnter(uint64 jobId, uint32 taskId)
searchTask.taskId = taskId; searchTask.taskId = taskId;
hashKey = (void *) &searchTask; hashKey = (void *) &searchTask;
workerTask = (WorkerTask *) hash_search(WorkerTasksSharedState->taskHash, hashKey, workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey,
HASH_ENTER_NULL, &handleFound); HASH_ENTER_NULL, &handleFound);
if (workerTask == NULL) if (workerTask == NULL)
{ {
@ -318,7 +321,7 @@ WorkerTasksHashFind(uint64 jobId, uint32 taskId)
searchTask.taskId = taskId; searchTask.taskId = taskId;
hashKey = (void *) &searchTask; hashKey = (void *) &searchTask;
workerTask = (WorkerTask *) hash_search(WorkerTasksSharedState->taskHash, hashKey, workerTask = (WorkerTask *) hash_search(TaskTrackerTaskHash, hashKey,
HASH_FIND, NULL); HASH_FIND, NULL);
return workerTask; return workerTask;
@ -601,14 +604,12 @@ TaskTrackerShmemInit(void)
} }
/* allocate hash table */ /* allocate hash table */
WorkerTasksSharedState->taskHash = TaskTrackerTaskHash = ShmemInitHash("Worker Task Hash", initTableSize, maxTableSize,
ShmemInitHash("Worker Task Hash", &info, hashFlags);
initTableSize, maxTableSize,
&info, hashFlags);
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
Assert(WorkerTasksSharedState->taskHash != NULL); Assert(TaskTrackerTaskHash != NULL);
Assert(WorkerTasksSharedState->taskHashTrancheId != 0); Assert(WorkerTasksSharedState->taskHashTrancheId != 0);
if (prev_shmem_startup_hook != NULL) if (prev_shmem_startup_hook != NULL)

View File

@ -189,7 +189,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
*/ */
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE);
hash_seq_init(&status, WorkerTasksSharedState->taskHash); hash_seq_init(&status, TaskTrackerTaskHash);
currentTask = (WorkerTask *) hash_seq_search(&status); currentTask = (WorkerTask *) hash_seq_search(&status);
while (currentTask != NULL) while (currentTask != NULL)
@ -415,8 +415,7 @@ CleanupTask(WorkerTask *workerTask)
} }
/* remove the task from the shared hash */ /* remove the task from the shared hash */
taskRemoved = hash_search(WorkerTasksSharedState->taskHash, hashKey, HASH_REMOVE, taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE, NULL);
NULL);
if (taskRemoved == NULL) if (taskRemoved == NULL)
{ {
ereport(FATAL, (errmsg("worker task hash corrupted"))); ereport(FATAL, (errmsg("worker task hash corrupted")));

View File

@ -101,9 +101,6 @@ typedef struct WorkerTask
*/ */
typedef struct WorkerTasksSharedStateData typedef struct WorkerTasksSharedStateData
{ {
/* Hash table shared by the task tracker and task tracker protocol functions */
HTAB *taskHash;
/* Lock protecting workerNodesHash */ /* Lock protecting workerNodesHash */
int taskHashTrancheId; int taskHashTrancheId;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
@ -123,6 +120,7 @@ extern int MaxTaskStringSize;
/* State shared by the task tracker and task tracker protocol functions */ /* State shared by the task tracker and task tracker protocol functions */
extern WorkerTasksSharedStateData *WorkerTasksSharedState; extern WorkerTasksSharedStateData *WorkerTasksSharedState;
extern HTAB *TaskTrackerTaskHash;
/* Entry point */ /* Entry point */
extern void TaskTrackerMain(Datum main_arg); extern void TaskTrackerMain(Datum main_arg);