mirror of https://github.com/citusdata/citus.git
Merge pull request #1592 from citusdata/improve_maintanince_deamon
Terminate bg worker on drop databasepull/1603/head
commit
734aeebc47
|
@ -31,11 +31,13 @@
|
||||||
#include "citus_version.h"
|
#include "citus_version.h"
|
||||||
#include "catalog/pg_constraint.h"
|
#include "catalog/pg_constraint.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "commands/prepare.h"
|
#include "commands/prepare.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -454,6 +456,19 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
" necessary users and roles.")));
|
" necessary users and roles.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that on DROP DATABASE we terminate the background deamon
|
||||||
|
* associated with it.
|
||||||
|
*/
|
||||||
|
if (IsA(parsetree, DropdbStmt))
|
||||||
|
{
|
||||||
|
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
|
||||||
|
char *dbname = dropDbStatement->dbname;
|
||||||
|
Oid databaseOid = get_database_oid(dbname, false);
|
||||||
|
|
||||||
|
StopMaintenanceDaemon(databaseOid);
|
||||||
|
}
|
||||||
|
|
||||||
/* set user if needed and go ahead and run local utility using standard hook */
|
/* set user if needed and go ahead and run local utility using standard hook */
|
||||||
if (commandMustRunAsOwner)
|
if (commandMustRunAsOwner)
|
||||||
{
|
{
|
||||||
|
|
|
@ -24,9 +24,11 @@
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
|
#include "catalog/namespace.h"
|
||||||
#include "distributed/distributed_deadlock_detection.h"
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
#include "postmaster/bgworker.h"
|
#include "postmaster/bgworker.h"
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/proc.h"
|
#include "storage/proc.h"
|
||||||
|
@ -73,6 +75,7 @@ typedef struct MaintenanceDaemonDBData
|
||||||
/* information: which user to use */
|
/* information: which user to use */
|
||||||
Oid userOid;
|
Oid userOid;
|
||||||
bool daemonStarted;
|
bool daemonStarted;
|
||||||
|
pid_t workerPid;
|
||||||
Latch *latch; /* pointer to the background worker's latch */
|
Latch *latch; /* pointer to the background worker's latch */
|
||||||
} MaintenanceDaemonDBData;
|
} MaintenanceDaemonDBData;
|
||||||
|
|
||||||
|
@ -170,6 +173,7 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
dbData->daemonStarted = true;
|
dbData->daemonStarted = true;
|
||||||
|
dbData->workerPid = 0;
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
@ -225,11 +229,19 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
*/
|
*/
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
|
||||||
|
|
||||||
|
/* from this point, DROP DATABASE will attempt to kill the worker */
|
||||||
|
myDbData->workerPid = MyProcPid;
|
||||||
|
|
||||||
|
/* wire up signals */
|
||||||
|
pqsignal(SIGTERM, die);
|
||||||
|
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
myDbData->latch = MyLatch;
|
myDbData->latch = MyLatch;
|
||||||
|
|
||||||
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Setup error context so log messages can be properly attributed. Some of
|
* Setup error context so log messages can be properly attributed. Some of
|
||||||
* them otherwise sound like they might be from a normal user connection.
|
* them otherwise sound like they might be from a normal user connection.
|
||||||
|
@ -242,10 +254,6 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
errorCallback.previous = error_context_stack;
|
errorCallback.previous = error_context_stack;
|
||||||
error_context_stack = &errorCallback;
|
error_context_stack = &errorCallback;
|
||||||
|
|
||||||
/* wire up signals */
|
|
||||||
pqsignal(SIGTERM, die);
|
|
||||||
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
|
|
||||||
BackgroundWorkerUnblockSignals();
|
|
||||||
|
|
||||||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
elog(LOG, "starting maintenance daemon on database %u user %u",
|
||||||
databaseOid, myDbData->userOid);
|
databaseOid, myDbData->userOid);
|
||||||
|
@ -286,6 +294,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
{
|
{
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We skip the deadlock detection if citus extension
|
||||||
|
* is not accessible.
|
||||||
|
*
|
||||||
|
* Similarly, we skip to run the deadlock checks if
|
||||||
|
* there exists any version mismatch or the extension
|
||||||
|
* is not fully created yet.
|
||||||
|
*/
|
||||||
if (!LockCitusExtension())
|
if (!LockCitusExtension())
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||||
|
@ -519,3 +535,33 @@ LockCitusExtension(void)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StopMaintenanceDaemon stops the maintenance daemon for the
|
||||||
|
* given database and removes it from the maintenance daemon
|
||||||
|
* control hash.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
StopMaintenanceDaemon(Oid databaseId)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
MaintenanceDaemonDBData *dbData = NULL;
|
||||||
|
pid_t workerPid = 0;
|
||||||
|
|
||||||
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash,
|
||||||
|
&databaseId, HASH_REMOVE, &found);
|
||||||
|
if (found)
|
||||||
|
{
|
||||||
|
workerPid = dbData->workerPid;
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
|
if (workerPid > 0)
|
||||||
|
{
|
||||||
|
kill(workerPid, SIGTERM);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
/* config variable for */
|
/* config variable for */
|
||||||
extern double DistributedDeadlockDetectionTimeoutFactor;
|
extern double DistributedDeadlockDetectionTimeoutFactor;
|
||||||
|
|
||||||
|
extern void StopMaintenanceDaemon(Oid databaseId);
|
||||||
extern void InitializeMaintenanceDaemon(void);
|
extern void InitializeMaintenanceDaemon(void);
|
||||||
extern void InitializeMaintenanceDaemonBackend(void);
|
extern void InitializeMaintenanceDaemonBackend(void);
|
||||||
|
|
||||||
|
|
|
@ -278,6 +278,7 @@ BEGIN
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
-- see that the deamon started
|
||||||
SELECT datname,
|
SELECT datname,
|
||||||
datname = current_database(),
|
datname = current_database(),
|
||||||
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
||||||
|
@ -287,15 +288,43 @@ FROM test.maintenance_worker();
|
||||||
another | t | t
|
another | t | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test that database with active worker can be dropped. That'll
|
-- Test that database with active worker can be dropped.
|
||||||
-- require killing the maintenance worker.
|
|
||||||
\c regression
|
\c regression
|
||||||
SELECT datname,
|
CREATE SCHEMA test_deamon;
|
||||||
pg_terminate_backend(pid)
|
-- we create a similar function on the regression database
|
||||||
FROM test.maintenance_worker('another');
|
-- note that this function checks for the existence of the daemon
|
||||||
datname | pg_terminate_backend
|
-- when not found, returns true else tries for 5 times and
|
||||||
---------+----------------------
|
-- returns false
|
||||||
another | t
|
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
|
||||||
|
RETURNS boolean
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
activity record;
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
LOOP
|
||||||
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
|
IF activity.pid IS NULL THEN
|
||||||
|
RETURN true;
|
||||||
|
ELSE
|
||||||
|
RETURN false;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
-- drop the database and see that the deamon is dead
|
||||||
|
DROP DATABASE another;
|
||||||
|
SELECT
|
||||||
|
*
|
||||||
|
FROM
|
||||||
|
test_deamon.maintenance_deamon_died('another');
|
||||||
|
maintenance_deamon_died
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP DATABASE another;
|
-- we don't need the schema and the function anymore
|
||||||
|
DROP SCHEMA test_deamon CASCADE;
|
||||||
|
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
|
||||||
|
|
|
@ -262,15 +262,47 @@ BEGIN
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
|
-- see that the deamon started
|
||||||
SELECT datname,
|
SELECT datname,
|
||||||
datname = current_database(),
|
datname = current_database(),
|
||||||
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
||||||
FROM test.maintenance_worker();
|
FROM test.maintenance_worker();
|
||||||
|
|
||||||
-- Test that database with active worker can be dropped. That'll
|
-- Test that database with active worker can be dropped.
|
||||||
-- require killing the maintenance worker.
|
|
||||||
\c regression
|
\c regression
|
||||||
SELECT datname,
|
|
||||||
pg_terminate_backend(pid)
|
CREATE SCHEMA test_deamon;
|
||||||
FROM test.maintenance_worker('another');
|
|
||||||
|
-- we create a similar function on the regression database
|
||||||
|
-- note that this function checks for the existence of the daemon
|
||||||
|
-- when not found, returns true else tries for 5 times and
|
||||||
|
-- returns false
|
||||||
|
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
|
||||||
|
RETURNS boolean
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
activity record;
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
LOOP
|
||||||
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
|
IF activity.pid IS NULL THEN
|
||||||
|
RETURN true;
|
||||||
|
ELSE
|
||||||
|
RETURN false;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
-- drop the database and see that the deamon is dead
|
||||||
DROP DATABASE another;
|
DROP DATABASE another;
|
||||||
|
SELECT
|
||||||
|
*
|
||||||
|
FROM
|
||||||
|
test_deamon.maintenance_deamon_died('another');
|
||||||
|
|
||||||
|
-- we don't need the schema and the function anymore
|
||||||
|
DROP SCHEMA test_deamon CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue