Address a couple issues with maintenace daemon management:

- Stop the daemon when citus extension is dropped
- Bail on maintenance daemon startup if myDbData is started with a non-zero pid
- Stop maintenance daemon from spawning itself
- Don't use postgres die, just wrap proc_exit(0)
- Assert(myDbData->workerPid == MyProcPid)

The two issues were that multiple daemons could be running for a database,
or that a daemon would be leftover after DROP EXTENSION citus
pull/3522/head
Philip Dubé 2020-02-19 18:20:46 +00:00
parent 6ee82c381e
commit bcf54c5014
6 changed files with 159 additions and 93 deletions

View File

@ -37,7 +37,6 @@ static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
static void EnsureSequentialModeForExtensionDDL(void);
static bool ShouldPropagateExtensionCommand(Node *parseTree);
static bool IsDropCitusStmt(Node *parseTree);
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
static Node * RecreateExtensionStmt(Oid extensionOid);
@ -727,7 +726,7 @@ IsCreateAlterExtensionUpdateCitusStmt(Node *parseTree)
* IsDropCitusStmt iterates the objects to be dropped in a drop statement
* and try to find citus there.
*/
static bool
bool
IsDropCitusStmt(Node *parseTree)
{
ListCell *objectCell = NULL;

View File

@ -452,6 +452,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
}
if (IsDropCitusStmt(parsetree))
{
StopMaintenanceDaemon(MyDatabaseId);
}
pstmt->utilityStmt = parsetree;
PG_TRY();
@ -633,11 +638,14 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PostprocessVacuumStmt(vacuumStmt, queryString);
}
if (!IsDropCitusStmt(parsetree) && !IsA(parsetree, DropdbStmt))
{
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded();
}
}

View File

@ -77,8 +77,8 @@ typedef struct MaintenanceDaemonDBData
/* information: which user to use */
Oid userOid;
bool daemonStarted;
pid_t workerPid;
bool daemonStarted;
bool triggerMetadataSync;
Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData;
@ -102,6 +102,7 @@ static HTAB *MaintenanceDaemonDBHash;
static volatile sig_atomic_t got_SIGHUP = false;
static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
static size_t MaintenanceDaemonShmemSize(void);
static void MaintenanceDaemonShmemInit(void);
@ -154,11 +155,17 @@ InitializeMaintenanceDaemonBackend(void)
ereport(ERROR, (errmsg("ran out of database slots")));
}
/* maintenance daemon can ignore itself */
if (dbData->workerPid == MyProcPid)
{
LWLockRelease(&MaintenanceDaemonControl->lock);
return;
}
if (!found || !dbData->daemonStarted)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
int pid = 0;
dbData->userOid = extensionOwner;
@ -173,13 +180,11 @@ InitializeMaintenanceDaemonBackend(void)
/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Stanby standbys.
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/*
* Restart after a bit after errors, but don't bog the system.
*/
/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
sprintf(worker.bgw_library_name, "citus");
sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain");
@ -198,7 +203,10 @@ InitializeMaintenanceDaemonBackend(void)
dbData->triggerMetadataSync = false;
LWLockRelease(&MaintenanceDaemonControl->lock);
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
pfree(handle);
}
else
{
@ -245,13 +253,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)
if (!myDbData || myDbData->workerPid != 0)
{
/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
* Alternatively, don't continue if another worker exists.
*/
proc_exit(0);
}
@ -260,7 +269,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
myDbData->workerPid = MyProcPid;
/* wire up signals */
pqsignal(SIGTERM, die);
pqsignal(SIGTERM, MaintenanceDaemonSigTermHandler);
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
BackgroundWorkerUnblockSignals();
@ -300,6 +309,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
CHECK_FOR_INTERRUPTS();
Assert(myDbData->workerPid == MyProcPid);
/*
* XXX: Each task should clear the metadata cache before every iteration
* by calling InvalidateMetadataSystemCache(), because otherwise it
@ -530,6 +541,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
myDbData->daemonStarted = false;
myDbData->workerPid = 0;
LWLockRelease(&MaintenanceDaemonControl->lock);
/* return code of 1 requests worker restart */
@ -630,6 +642,14 @@ MaintenanceDaemonShmemInit(void)
}
/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
static void
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
{
proc_exit(0);
}
/*
* MaintenanceDaemonSigHupHandler set a flag to re-read config file at next
* convenient time.
@ -709,6 +729,7 @@ StopMaintenanceDaemon(Oid databaseId)
MaintenanceDaemonDBHash,
&databaseId,
HASH_REMOVE, &found);
if (found)
{
workerPid = dbData->workerPid;

View File

@ -71,6 +71,7 @@ extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_o
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
/* extension.c - forward declarations */
extern bool IsDropCitusStmt(Node *parsetree);
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);

View File

@ -6,7 +6,7 @@
-- It'd be nice to script generation of this file, but alas, that's
-- not done yet.
SET citus.next_shard_id TO 580000;
CREATE SCHEMA test;
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
@ -14,18 +14,23 @@ AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
@ -221,6 +226,45 @@ ALTER EXTENSION citus UPDATE;
(0 rows)
\c - - - :master_port
-- test https://github.com/citusdata/citus/issues/3409
CREATE USER testuser2 SUPERUSER;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SET ROLE testuser2;
DROP EXTENSION Citus;
-- Loop until we see there's no maintenance daemon running
DO $$begin
for i in 0 .. 100 loop
if i = 100 then raise 'Waited too long'; end if;
PERFORM pg_stat_clear_snapshot();
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
if not found then exit; end if;
perform pg_sleep(0.1);
end loop;
end$$;
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
datid | datname | usename
---------------------------------------------------------------------
(0 rows)
CREATE EXTENSION Citus;
-- Loop until we there's a maintenance daemon running
DO $$begin
for i in 0 .. 100 loop
if i = 100 then raise 'Waited too long'; end if;
PERFORM pg_stat_clear_snapshot();
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
if found then exit; end if;
perform pg_sleep(0.1);
end loop;
end$$;
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
datid | datname | usename
---------------------------------------------------------------------
16384 | regression | testuser2
(1 row)
RESET ROLE;
-- check that maintenance daemon gets (re-)started for the right user
DROP EXTENSION citus;
CREATE USER testuser SUPERUSER;
@ -246,26 +290,8 @@ HINT: You can manually create a database and its extensions on workers.
\c another
CREATE EXTENSION citus;
CREATE SCHEMA test;
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
-- see that the deamon started
:create_function_test_maintenance_worker
-- see that the daemon started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
@ -276,12 +302,12 @@ FROM test.maintenance_worker();
-- Test that database with active worker can be dropped.
\c regression
CREATE SCHEMA test_deamon;
CREATE SCHEMA test_daemon;
-- we create a similar function on the regression database
-- note that this function checks for the existence of the daemon
-- when not found, returns true else tries for 5 times and
-- returns false
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
CREATE OR REPLACE FUNCTION test_daemon.maintenance_daemon_died(p_dbname text)
RETURNS boolean
LANGUAGE plpgsql
AS $$
@ -289,7 +315,6 @@ DECLARE
activity record;
BEGIN
PERFORM pg_stat_clear_snapshot();
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NULL THEN
@ -297,23 +322,22 @@ BEGIN
ELSE
RETURN false;
END IF;
END LOOP;
END;
$$;
-- drop the database and see that the deamon is dead
-- drop the database and see that the daemon is dead
DROP DATABASE another;
SELECT
*
FROM
test_deamon.maintenance_deamon_died('another');
maintenance_deamon_died
test_daemon.maintenance_daemon_died('another');
maintenance_daemon_died
---------------------------------------------------------------------
t
(1 row)
-- we don't need the schema and the function anymore
DROP SCHEMA test_deamon CASCADE;
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
DROP SCHEMA test_daemon CASCADE;
NOTICE: drop cascades to function test_daemon.maintenance_daemon_died(text)
-- verify citus does not crash while creating a table when run against an older worker
-- create_distributed_table piggybacks multiple commands into single one, if one worker
-- did not have the required UDF it should fail instead of crash.

View File

@ -9,8 +9,7 @@
SET citus.next_shard_id TO 580000;
CREATE SCHEMA test;
SELECT $definition$
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
@ -18,18 +17,24 @@ AS $$
DECLARE
activity record;
BEGIN
DO 'BEGIN END'; -- Force maintenance daemon to start
LOOP
PERFORM pg_stat_clear_snapshot();
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
$definition$ create_function_test_maintenance_worker
\gset
CREATE SCHEMA test;
:create_function_test_maintenance_worker
-- check maintenance daemon is started
SELECT datname, current_database(),
@ -207,6 +212,35 @@ ALTER EXTENSION citus UPDATE;
\c - - - :master_port
-- test https://github.com/citusdata/citus/issues/3409
CREATE USER testuser2 SUPERUSER;
SET ROLE testuser2;
DROP EXTENSION Citus;
-- Loop until we see there's no maintenance daemon running
DO $$begin
for i in 0 .. 100 loop
if i = 100 then raise 'Waited too long'; end if;
PERFORM pg_stat_clear_snapshot();
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
if not found then exit; end if;
perform pg_sleep(0.1);
end loop;
end$$;
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
CREATE EXTENSION Citus;
-- Loop until we there's a maintenance daemon running
DO $$begin
for i in 0 .. 100 loop
if i = 100 then raise 'Waited too long'; end if;
PERFORM pg_stat_clear_snapshot();
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
if found then exit; end if;
perform pg_sleep(0.1);
end loop;
end$$;
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
RESET ROLE;
-- check that maintenance daemon gets (re-)started for the right user
DROP EXTENSION citus;
CREATE USER testuser SUPERUSER;
@ -229,28 +263,9 @@ CREATE DATABASE another;
CREATE EXTENSION citus;
CREATE SCHEMA test;
:create_function_test_maintenance_worker
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
-- see that the deamon started
-- see that the daemon started
SELECT datname, current_database(),
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
@ -258,13 +273,13 @@ FROM test.maintenance_worker();
-- Test that database with active worker can be dropped.
\c regression
CREATE SCHEMA test_deamon;
CREATE SCHEMA test_daemon;
-- we create a similar function on the regression database
-- note that this function checks for the existence of the daemon
-- when not found, returns true else tries for 5 times and
-- returns false
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
CREATE OR REPLACE FUNCTION test_daemon.maintenance_daemon_died(p_dbname text)
RETURNS boolean
LANGUAGE plpgsql
AS $$
@ -272,7 +287,6 @@ DECLARE
activity record;
BEGIN
PERFORM pg_stat_clear_snapshot();
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NULL THEN
@ -280,19 +294,18 @@ BEGIN
ELSE
RETURN false;
END IF;
END LOOP;
END;
$$;
-- drop the database and see that the deamon is dead
-- drop the database and see that the daemon is dead
DROP DATABASE another;
SELECT
*
FROM
test_deamon.maintenance_deamon_died('another');
test_daemon.maintenance_daemon_died('another');
-- we don't need the schema and the function anymore
DROP SCHEMA test_deamon CASCADE;
DROP SCHEMA test_daemon CASCADE;
-- verify citus does not crash while creating a table when run against an older worker