mirror of https://github.com/citusdata/citus.git
Merge pull request #3522 from citusdata/fix-flaky-multi-extension-2
Address a couple issues with maintenace daemon managementpull/3536/head
commit
c291fd5d11
|
@ -37,7 +37,6 @@ static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
|
||||||
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
|
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
|
||||||
static void EnsureSequentialModeForExtensionDDL(void);
|
static void EnsureSequentialModeForExtensionDDL(void);
|
||||||
static bool ShouldPropagateExtensionCommand(Node *parseTree);
|
static bool ShouldPropagateExtensionCommand(Node *parseTree);
|
||||||
static bool IsDropCitusStmt(Node *parseTree);
|
|
||||||
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
|
static bool IsAlterExtensionSetSchemaCitus(Node *parseTree);
|
||||||
static Node * RecreateExtensionStmt(Oid extensionOid);
|
static Node * RecreateExtensionStmt(Oid extensionOid);
|
||||||
|
|
||||||
|
@ -727,7 +726,7 @@ IsCreateAlterExtensionUpdateCitusStmt(Node *parseTree)
|
||||||
* IsDropCitusStmt iterates the objects to be dropped in a drop statement
|
* IsDropCitusStmt iterates the objects to be dropped in a drop statement
|
||||||
* and try to find citus there.
|
* and try to find citus there.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
IsDropCitusStmt(Node *parseTree)
|
IsDropCitusStmt(Node *parseTree)
|
||||||
{
|
{
|
||||||
ListCell *objectCell = NULL;
|
ListCell *objectCell = NULL;
|
||||||
|
|
|
@ -452,6 +452,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsDropCitusStmt(parsetree))
|
||||||
|
{
|
||||||
|
StopMaintenanceDaemon(MyDatabaseId);
|
||||||
|
}
|
||||||
|
|
||||||
pstmt->utilityStmt = parsetree;
|
pstmt->utilityStmt = parsetree;
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
|
@ -633,11 +638,14 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
PostprocessVacuumStmt(vacuumStmt, queryString);
|
PostprocessVacuumStmt(vacuumStmt, queryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
if (!IsDropCitusStmt(parsetree) && !IsA(parsetree, DropdbStmt))
|
||||||
* Ensure value is valid, we can't do some checks during CREATE
|
{
|
||||||
* EXTENSION. This is important to register some invalidation callbacks.
|
/*
|
||||||
*/
|
* Ensure value is valid, we can't do some checks during CREATE
|
||||||
CitusHasBeenLoaded();
|
* EXTENSION. This is important to register some invalidation callbacks.
|
||||||
|
*/
|
||||||
|
CitusHasBeenLoaded();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -77,8 +77,8 @@ typedef struct MaintenanceDaemonDBData
|
||||||
|
|
||||||
/* information: which user to use */
|
/* information: which user to use */
|
||||||
Oid userOid;
|
Oid userOid;
|
||||||
bool daemonStarted;
|
|
||||||
pid_t workerPid;
|
pid_t workerPid;
|
||||||
|
bool daemonStarted;
|
||||||
bool triggerMetadataSync;
|
bool triggerMetadataSync;
|
||||||
Latch *latch; /* pointer to the background worker's latch */
|
Latch *latch; /* pointer to the background worker's latch */
|
||||||
} MaintenanceDaemonDBData;
|
} MaintenanceDaemonDBData;
|
||||||
|
@ -102,6 +102,7 @@ static HTAB *MaintenanceDaemonDBHash;
|
||||||
|
|
||||||
static volatile sig_atomic_t got_SIGHUP = false;
|
static volatile sig_atomic_t got_SIGHUP = false;
|
||||||
|
|
||||||
|
static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
|
||||||
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
|
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
|
||||||
static size_t MaintenanceDaemonShmemSize(void);
|
static size_t MaintenanceDaemonShmemSize(void);
|
||||||
static void MaintenanceDaemonShmemInit(void);
|
static void MaintenanceDaemonShmemInit(void);
|
||||||
|
@ -154,11 +155,17 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
ereport(ERROR, (errmsg("ran out of database slots")));
|
ereport(ERROR, (errmsg("ran out of database slots")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* maintenance daemon can ignore itself */
|
||||||
|
if (dbData->workerPid == MyProcPid)
|
||||||
|
{
|
||||||
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!found || !dbData->daemonStarted)
|
if (!found || !dbData->daemonStarted)
|
||||||
{
|
{
|
||||||
BackgroundWorker worker;
|
BackgroundWorker worker;
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
int pid = 0;
|
|
||||||
|
|
||||||
dbData->userOid = extensionOwner;
|
dbData->userOid = extensionOwner;
|
||||||
|
|
||||||
|
@ -173,13 +180,11 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* No point in getting started before able to run query, but we do
|
* No point in getting started before able to run query, but we do
|
||||||
* want to get started on Hot-Stanby standbys.
|
* want to get started on Hot-Standby.
|
||||||
*/
|
*/
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
/*
|
/* Restart after a bit after errors, but don't bog the system. */
|
||||||
* Restart after a bit after errors, but don't bog the system.
|
|
||||||
*/
|
|
||||||
worker.bgw_restart_time = 5;
|
worker.bgw_restart_time = 5;
|
||||||
sprintf(worker.bgw_library_name, "citus");
|
sprintf(worker.bgw_library_name, "citus");
|
||||||
sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain");
|
sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain");
|
||||||
|
@ -198,7 +203,10 @@ InitializeMaintenanceDaemonBackend(void)
|
||||||
dbData->triggerMetadataSync = false;
|
dbData->triggerMetadataSync = false;
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
|
pid_t pid;
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
|
||||||
|
pfree(handle);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -245,13 +253,14 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
||||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||||
HASH_FIND, NULL);
|
HASH_FIND, NULL);
|
||||||
if (!myDbData)
|
if (!myDbData || myDbData->workerPid != 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* When the database crashes, background workers are restarted, but
|
* When the database crashes, background workers are restarted, but
|
||||||
* the state in shared memory is lost. In that case, we exit and
|
* the state in shared memory is lost. In that case, we exit and
|
||||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||||
* to properly add it to the hash.
|
* to properly add it to the hash.
|
||||||
|
* Alternatively, don't continue if another worker exists.
|
||||||
*/
|
*/
|
||||||
proc_exit(0);
|
proc_exit(0);
|
||||||
}
|
}
|
||||||
|
@ -260,7 +269,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
myDbData->workerPid = MyProcPid;
|
myDbData->workerPid = MyProcPid;
|
||||||
|
|
||||||
/* wire up signals */
|
/* wire up signals */
|
||||||
pqsignal(SIGTERM, die);
|
pqsignal(SIGTERM, MaintenanceDaemonSigTermHandler);
|
||||||
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
|
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
|
||||||
BackgroundWorkerUnblockSignals();
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
|
@ -300,6 +309,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
Assert(myDbData->workerPid == MyProcPid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* XXX: Each task should clear the metadata cache before every iteration
|
* XXX: Each task should clear the metadata cache before every iteration
|
||||||
* by calling InvalidateMetadataSystemCache(), because otherwise it
|
* by calling InvalidateMetadataSystemCache(), because otherwise it
|
||||||
|
@ -530,6 +541,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
myDbData->daemonStarted = false;
|
myDbData->daemonStarted = false;
|
||||||
|
myDbData->workerPid = 0;
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
/* return code of 1 requests worker restart */
|
/* return code of 1 requests worker restart */
|
||||||
|
@ -630,6 +642,14 @@ MaintenanceDaemonShmemInit(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
|
||||||
|
static void
|
||||||
|
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
proc_exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MaintenanceDaemonSigHupHandler set a flag to re-read config file at next
|
* MaintenanceDaemonSigHupHandler set a flag to re-read config file at next
|
||||||
* convenient time.
|
* convenient time.
|
||||||
|
@ -709,6 +729,7 @@ StopMaintenanceDaemon(Oid databaseId)
|
||||||
MaintenanceDaemonDBHash,
|
MaintenanceDaemonDBHash,
|
||||||
&databaseId,
|
&databaseId,
|
||||||
HASH_REMOVE, &found);
|
HASH_REMOVE, &found);
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
workerPid = dbData->workerPid;
|
workerPid = dbData->workerPid;
|
||||||
|
|
|
@ -71,6 +71,7 @@ extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_o
|
||||||
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
|
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
|
||||||
|
|
||||||
/* extension.c - forward declarations */
|
/* extension.c - forward declarations */
|
||||||
|
extern bool IsDropCitusStmt(Node *parsetree);
|
||||||
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
|
extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree);
|
||||||
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
|
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
|
||||||
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);
|
extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString);
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
-- It'd be nice to script generation of this file, but alas, that's
|
-- It'd be nice to script generation of this file, but alas, that's
|
||||||
-- not done yet.
|
-- not done yet.
|
||||||
SET citus.next_shard_id TO 580000;
|
SET citus.next_shard_id TO 580000;
|
||||||
CREATE SCHEMA test;
|
SELECT $definition$
|
||||||
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
||||||
RETURNS pg_stat_activity
|
RETURNS pg_stat_activity
|
||||||
LANGUAGE plpgsql
|
LANGUAGE plpgsql
|
||||||
|
@ -14,18 +14,23 @@ AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
activity record;
|
activity record;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
DO 'BEGIN END'; -- Force maintenance daemon to start
|
||||||
LOOP
|
LOOP
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
IF activity.pid IS NOT NULL THEN
|
IF activity.pid IS NOT NULL THEN
|
||||||
RETURN activity;
|
RETURN activity;
|
||||||
ELSE
|
ELSE
|
||||||
PERFORM pg_sleep(0.1);
|
PERFORM pg_sleep(0.1);
|
||||||
PERFORM pg_stat_clear_snapshot();
|
|
||||||
END IF ;
|
END IF ;
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
$definition$ create_function_test_maintenance_worker
|
||||||
|
\gset
|
||||||
|
CREATE SCHEMA test;
|
||||||
|
:create_function_test_maintenance_worker
|
||||||
-- check maintenance daemon is started
|
-- check maintenance daemon is started
|
||||||
SELECT datname, current_database(),
|
SELECT datname, current_database(),
|
||||||
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
||||||
|
@ -221,6 +226,45 @@ ALTER EXTENSION citus UPDATE;
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
-- test https://github.com/citusdata/citus/issues/3409
|
||||||
|
CREATE USER testuser2 SUPERUSER;
|
||||||
|
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||||
|
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||||
|
SET ROLE testuser2;
|
||||||
|
DROP EXTENSION Citus;
|
||||||
|
-- Loop until we see there's no maintenance daemon running
|
||||||
|
DO $$begin
|
||||||
|
for i in 0 .. 100 loop
|
||||||
|
if i = 100 then raise 'Waited too long'; end if;
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
|
||||||
|
if not found then exit; end if;
|
||||||
|
perform pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$;
|
||||||
|
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
datid | datname | usename
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
CREATE EXTENSION Citus;
|
||||||
|
-- Loop until we there's a maintenance daemon running
|
||||||
|
DO $$begin
|
||||||
|
for i in 0 .. 100 loop
|
||||||
|
if i = 100 then raise 'Waited too long'; end if;
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
|
||||||
|
if found then exit; end if;
|
||||||
|
perform pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$;
|
||||||
|
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
datid | datname | usename
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16384 | regression | testuser2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
-- check that maintenance daemon gets (re-)started for the right user
|
-- check that maintenance daemon gets (re-)started for the right user
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
CREATE USER testuser SUPERUSER;
|
CREATE USER testuser SUPERUSER;
|
||||||
|
@ -246,26 +290,8 @@ HINT: You can manually create a database and its extensions on workers.
|
||||||
\c another
|
\c another
|
||||||
CREATE EXTENSION citus;
|
CREATE EXTENSION citus;
|
||||||
CREATE SCHEMA test;
|
CREATE SCHEMA test;
|
||||||
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
:create_function_test_maintenance_worker
|
||||||
RETURNS pg_stat_activity
|
-- see that the daemon started
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS $$
|
|
||||||
DECLARE
|
|
||||||
activity record;
|
|
||||||
BEGIN
|
|
||||||
LOOP
|
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
|
||||||
IF activity.pid IS NOT NULL THEN
|
|
||||||
RETURN activity;
|
|
||||||
ELSE
|
|
||||||
PERFORM pg_sleep(0.1);
|
|
||||||
PERFORM pg_stat_clear_snapshot();
|
|
||||||
END IF ;
|
|
||||||
END LOOP;
|
|
||||||
END;
|
|
||||||
$$;
|
|
||||||
-- see that the deamon started
|
|
||||||
SELECT datname, current_database(),
|
SELECT datname, current_database(),
|
||||||
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
||||||
FROM test.maintenance_worker();
|
FROM test.maintenance_worker();
|
||||||
|
@ -276,12 +302,12 @@ FROM test.maintenance_worker();
|
||||||
|
|
||||||
-- Test that database with active worker can be dropped.
|
-- Test that database with active worker can be dropped.
|
||||||
\c regression
|
\c regression
|
||||||
CREATE SCHEMA test_deamon;
|
CREATE SCHEMA test_daemon;
|
||||||
-- we create a similar function on the regression database
|
-- we create a similar function on the regression database
|
||||||
-- note that this function checks for the existence of the daemon
|
-- note that this function checks for the existence of the daemon
|
||||||
-- when not found, returns true else tries for 5 times and
|
-- when not found, returns true else tries for 5 times and
|
||||||
-- returns false
|
-- returns false
|
||||||
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
|
CREATE OR REPLACE FUNCTION test_daemon.maintenance_daemon_died(p_dbname text)
|
||||||
RETURNS boolean
|
RETURNS boolean
|
||||||
LANGUAGE plpgsql
|
LANGUAGE plpgsql
|
||||||
AS $$
|
AS $$
|
||||||
|
@ -289,31 +315,29 @@ DECLARE
|
||||||
activity record;
|
activity record;
|
||||||
BEGIN
|
BEGIN
|
||||||
PERFORM pg_stat_clear_snapshot();
|
PERFORM pg_stat_clear_snapshot();
|
||||||
LOOP
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
IF activity.pid IS NULL THEN
|
||||||
IF activity.pid IS NULL THEN
|
RETURN true;
|
||||||
RETURN true;
|
ELSE
|
||||||
ELSE
|
RETURN false;
|
||||||
RETURN false;
|
END IF;
|
||||||
END IF;
|
|
||||||
END LOOP;
|
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
-- drop the database and see that the deamon is dead
|
-- drop the database and see that the daemon is dead
|
||||||
DROP DATABASE another;
|
DROP DATABASE another;
|
||||||
SELECT
|
SELECT
|
||||||
*
|
*
|
||||||
FROM
|
FROM
|
||||||
test_deamon.maintenance_deamon_died('another');
|
test_daemon.maintenance_daemon_died('another');
|
||||||
maintenance_deamon_died
|
maintenance_daemon_died
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- we don't need the schema and the function anymore
|
-- we don't need the schema and the function anymore
|
||||||
DROP SCHEMA test_deamon CASCADE;
|
DROP SCHEMA test_daemon CASCADE;
|
||||||
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
|
NOTICE: drop cascades to function test_daemon.maintenance_daemon_died(text)
|
||||||
-- verify citus does not crash while creating a table when run against an older worker
|
-- verify citus does not crash while creating a table when run against an older worker
|
||||||
-- create_distributed_table piggybacks multiple commands into single one, if one worker
|
-- create_distributed_table piggybacks multiple commands into single one, if one worker
|
||||||
-- did not have the required UDF it should fail instead of crash.
|
-- did not have the required UDF it should fail instead of crash.
|
||||||
|
|
|
@ -9,8 +9,7 @@
|
||||||
|
|
||||||
SET citus.next_shard_id TO 580000;
|
SET citus.next_shard_id TO 580000;
|
||||||
|
|
||||||
CREATE SCHEMA test;
|
SELECT $definition$
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
||||||
RETURNS pg_stat_activity
|
RETURNS pg_stat_activity
|
||||||
LANGUAGE plpgsql
|
LANGUAGE plpgsql
|
||||||
|
@ -18,18 +17,24 @@ AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
activity record;
|
activity record;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
DO 'BEGIN END'; -- Force maintenance daemon to start
|
||||||
LOOP
|
LOOP
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
IF activity.pid IS NOT NULL THEN
|
IF activity.pid IS NOT NULL THEN
|
||||||
RETURN activity;
|
RETURN activity;
|
||||||
ELSE
|
ELSE
|
||||||
PERFORM pg_sleep(0.1);
|
PERFORM pg_sleep(0.1);
|
||||||
PERFORM pg_stat_clear_snapshot();
|
|
||||||
END IF ;
|
END IF ;
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
$definition$ create_function_test_maintenance_worker
|
||||||
|
\gset
|
||||||
|
|
||||||
|
CREATE SCHEMA test;
|
||||||
|
:create_function_test_maintenance_worker
|
||||||
|
|
||||||
-- check maintenance daemon is started
|
-- check maintenance daemon is started
|
||||||
SELECT datname, current_database(),
|
SELECT datname, current_database(),
|
||||||
|
@ -207,6 +212,35 @@ ALTER EXTENSION citus UPDATE;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- test https://github.com/citusdata/citus/issues/3409
|
||||||
|
CREATE USER testuser2 SUPERUSER;
|
||||||
|
SET ROLE testuser2;
|
||||||
|
DROP EXTENSION Citus;
|
||||||
|
-- Loop until we see there's no maintenance daemon running
|
||||||
|
DO $$begin
|
||||||
|
for i in 0 .. 100 loop
|
||||||
|
if i = 100 then raise 'Waited too long'; end if;
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
|
||||||
|
if not found then exit; end if;
|
||||||
|
perform pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$;
|
||||||
|
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
CREATE EXTENSION Citus;
|
||||||
|
-- Loop until we there's a maintenance daemon running
|
||||||
|
DO $$begin
|
||||||
|
for i in 0 .. 100 loop
|
||||||
|
if i = 100 then raise 'Waited too long'; end if;
|
||||||
|
PERFORM pg_stat_clear_snapshot();
|
||||||
|
perform * from pg_stat_activity where application_name = 'Citus Maintenance Daemon';
|
||||||
|
if found then exit; end if;
|
||||||
|
perform pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$;
|
||||||
|
SELECT datid, datname, usename FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
-- check that maintenance daemon gets (re-)started for the right user
|
-- check that maintenance daemon gets (re-)started for the right user
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
CREATE USER testuser SUPERUSER;
|
CREATE USER testuser SUPERUSER;
|
||||||
|
@ -229,28 +263,9 @@ CREATE DATABASE another;
|
||||||
CREATE EXTENSION citus;
|
CREATE EXTENSION citus;
|
||||||
|
|
||||||
CREATE SCHEMA test;
|
CREATE SCHEMA test;
|
||||||
|
:create_function_test_maintenance_worker
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
|
-- see that the daemon started
|
||||||
RETURNS pg_stat_activity
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS $$
|
|
||||||
DECLARE
|
|
||||||
activity record;
|
|
||||||
BEGIN
|
|
||||||
LOOP
|
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
|
||||||
IF activity.pid IS NOT NULL THEN
|
|
||||||
RETURN activity;
|
|
||||||
ELSE
|
|
||||||
PERFORM pg_sleep(0.1);
|
|
||||||
PERFORM pg_stat_clear_snapshot();
|
|
||||||
END IF ;
|
|
||||||
END LOOP;
|
|
||||||
END;
|
|
||||||
$$;
|
|
||||||
|
|
||||||
-- see that the deamon started
|
|
||||||
SELECT datname, current_database(),
|
SELECT datname, current_database(),
|
||||||
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
usename, (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
|
||||||
FROM test.maintenance_worker();
|
FROM test.maintenance_worker();
|
||||||
|
@ -258,13 +273,13 @@ FROM test.maintenance_worker();
|
||||||
-- Test that database with active worker can be dropped.
|
-- Test that database with active worker can be dropped.
|
||||||
\c regression
|
\c regression
|
||||||
|
|
||||||
CREATE SCHEMA test_deamon;
|
CREATE SCHEMA test_daemon;
|
||||||
|
|
||||||
-- we create a similar function on the regression database
|
-- we create a similar function on the regression database
|
||||||
-- note that this function checks for the existence of the daemon
|
-- note that this function checks for the existence of the daemon
|
||||||
-- when not found, returns true else tries for 5 times and
|
-- when not found, returns true else tries for 5 times and
|
||||||
-- returns false
|
-- returns false
|
||||||
CREATE OR REPLACE FUNCTION test_deamon.maintenance_deamon_died(p_dbname text)
|
CREATE OR REPLACE FUNCTION test_daemon.maintenance_daemon_died(p_dbname text)
|
||||||
RETURNS boolean
|
RETURNS boolean
|
||||||
LANGUAGE plpgsql
|
LANGUAGE plpgsql
|
||||||
AS $$
|
AS $$
|
||||||
|
@ -272,27 +287,25 @@ DECLARE
|
||||||
activity record;
|
activity record;
|
||||||
BEGIN
|
BEGIN
|
||||||
PERFORM pg_stat_clear_snapshot();
|
PERFORM pg_stat_clear_snapshot();
|
||||||
LOOP
|
SELECT * INTO activity FROM pg_stat_activity
|
||||||
SELECT * INTO activity FROM pg_stat_activity
|
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
||||||
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
|
IF activity.pid IS NULL THEN
|
||||||
IF activity.pid IS NULL THEN
|
RETURN true;
|
||||||
RETURN true;
|
ELSE
|
||||||
ELSE
|
RETURN false;
|
||||||
RETURN false;
|
END IF;
|
||||||
END IF;
|
|
||||||
END LOOP;
|
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
-- drop the database and see that the deamon is dead
|
-- drop the database and see that the daemon is dead
|
||||||
DROP DATABASE another;
|
DROP DATABASE another;
|
||||||
SELECT
|
SELECT
|
||||||
*
|
*
|
||||||
FROM
|
FROM
|
||||||
test_deamon.maintenance_deamon_died('another');
|
test_daemon.maintenance_daemon_died('another');
|
||||||
|
|
||||||
-- we don't need the schema and the function anymore
|
-- we don't need the schema and the function anymore
|
||||||
DROP SCHEMA test_deamon CASCADE;
|
DROP SCHEMA test_daemon CASCADE;
|
||||||
|
|
||||||
|
|
||||||
-- verify citus does not crash while creating a table when run against an older worker
|
-- verify citus does not crash while creating a table when run against an older worker
|
||||||
|
|
Loading…
Reference in New Issue