Merge pull request #1461 from citusdata/feature/maintenanced

Add automatically started per-database Maintenance Worker
pull/1379/head
Andres Freund 2017-06-23 12:12:31 -07:00 committed by GitHub
commit 71a4e90e82
8 changed files with 839 additions and 159 deletions

View File

@ -453,6 +453,12 @@ multi_ProcessUtility(Node *parsetree,
ProcessVacuumStmt(vacuumStmt, queryString);
}
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded();
}

View File

@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* shared_library_init.c
* Initialize Citus extension
* Functionality related to the initialization of the Citus extension.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*-------------------------------------------------------------------------
@ -22,6 +22,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/connection_management.h"
#include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
@ -36,6 +37,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h"
#include "distributed/task_tracker.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
@ -160,6 +162,8 @@ _PG_init(void)
set_rel_pathlist_hook = multi_relation_restriction_hook;
set_join_pathlist_hook = multi_join_restriction_hook;
InitializeMaintenanceDaemon();
/* organize that task tracker is started once server is up */
TaskTrackerRegister();
@ -177,6 +181,21 @@ _PG_init(void)
}
/*
* StartupCitusBackend initializes per-backend infrastructure, and is called
* the first time citus is used in a database.
*
* NB: All code here has to be able to cope with this routine being called
* multiple times in the same backend. This will e.g. happen when the
* extension is created or upgraded.
*/
void
StartupCitusBackend(void)
{
InitializeMaintenanceDaemonBackend();
}
/*
* CreateRequiredDirectories - Create directories required for Citus to
* function.

View File

@ -0,0 +1,412 @@
/*-------------------------------------------------------------------------
*
* maintenanced.c
* Background worker run for each citus using database in a postgres
* cluster.
*
* This file provides infrastructure for launching exactly one a background
* worker for every database in which citus is used. That background worker
* can then perform work like deadlock detection, prepared transaction
* recovery, and cleanup.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "access/xact.h"
#include "libpq/pqsignal.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_cache.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "tcop/tcopprot.h"
/*
* Shared memory data for all maintenance workers.
*/
typedef struct MaintenanceDaemonControlData
{
/*
* Lock protecting the shared memory state. This is to be taken when
* looking up (shared mode) or inserting (exclusive mode) per-database
* data in dbHash.
*/
int trancheId;
LWLockTranche lockTranche;
LWLock lock;
/*
* Hash-table of workers, one entry for each database with citus
* activated.
*/
HTAB *dbHash;
} MaintenanceDaemonControlData;
/*
* Per database worker state.
*/
typedef struct MaintenanceDaemonDBData
{
/* hash key: database to run on */
Oid databaseOid;
/* information: which user to use */
Oid userOid;
bool daemonStarted;
Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
static volatile sig_atomic_t got_SIGHUP = false;
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
static size_t MaintenanceDaemonShmemSize(void);
static void MaintenanceDaemonShmemInit(void);
static void MaintenanceDaemonErrorContext(void *arg);
/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
* requesting shared memory and related infrastructure required by maintenance
* daemons.
*/
void
InitializeMaintenanceDaemon(void)
{
RequestAddinShmemSpace(MaintenanceDaemonShmemSize());
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = MaintenanceDaemonShmemInit;
}
/*
* InitializeMaintenanceDaemonBackend, called at backend start and
* configuration changes, is responsible for starting a per-database
* maintenance worker if necessary.
*/
void
InitializeMaintenanceDaemonBackend(void)
{
MaintenanceDaemonDBData *dbData = NULL;
Oid extensionOwner = CitusExtensionOwner();
bool found;
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
dbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonControl->dbHash,
&MyDatabaseId,
HASH_ENTER_NULL, &found);
if (dbData == NULL)
{
/* FIXME: better message, reference relevant guc in hint */
ereport(ERROR, (errmsg("ran out of database slots")));
}
if (!found || !dbData->daemonStarted)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
int pid = 0;
dbData->userOid = extensionOwner;
memset(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, BGW_MAXLEN,
"Citus Maintenance Daemon: %u/%u",
MyDatabaseId, extensionOwner);
/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Stanby standbys.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/*
* Restart after a bit after errors, but don't bog the system.
*/
worker.bgw_restart_time = 5;
sprintf(worker.bgw_library_name, "citus");
sprintf(worker.bgw_function_name, "CitusMaintenanceDaemonMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy(worker.bgw_extra, &extensionOwner, sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
ereport(ERROR, (errmsg("could not start maintenance background worker"),
errhint("Increasing max_worker_processes might help.")));
}
dbData->daemonStarted = true;
LWLockRelease(&MaintenanceDaemonControl->lock);
WaitForBackgroundWorkerStartup(handle, &pid);
}
else
{
Assert(dbData->daemonStarted);
/*
* If owner of extension changed, wake up daemon. It'll notice and
* restart.
*/
if (dbData->userOid != extensionOwner)
{
dbData->userOid = extensionOwner;
if (dbData->latch)
{
SetLatch(dbData->latch);
}
}
LWLockRelease(&MaintenanceDaemonControl->lock);
}
}
/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
{
Oid databaseOid = DatumGetObjectId(main_arg);
MaintenanceDaemonDBData *myDbData = NULL;
ErrorContextCallback errorCallback;
/*
* Look up this worker's configuration.
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_SHARED);
myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonControl->dbHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)
{
/* should never happen */
ereport(ERROR, (errmsg("got lost finding myself")));
}
LWLockRelease(&MaintenanceDaemonControl->lock);
myDbData->latch = MyLatch;
/*
* Setup error context so log messages can be properly attributed. Some of
* them otherwise sound like they might be from a normal user connection.
* Do so before setting up signals etc, so we never exit without the
* context setup.
*/
memset(&errorCallback, 0, sizeof(errorCallback));
errorCallback.callback = MaintenanceDaemonErrorContext;
errorCallback.arg = (void *) myDbData;
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback;
/* wire up signals */
pqsignal(SIGTERM, die);
pqsignal(SIGHUP, MaintenanceDaemonSigHupHandler);
BackgroundWorkerUnblockSignals();
elog(LOG, "starting maintenance daemon on database %u user %u",
databaseOid, myDbData->userOid);
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid);
/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("Citus Maintenance Daemon");
/* enter main loop */
for (;;)
{
int rc;
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
int timeout = 10000; /* wake up at least every so often */
CHECK_FOR_INTERRUPTS();
/*
* Perform Work. If a specific task needs to be called sooner than
* timeout indicates, it's ok to lower it to that value. Expensive
* tasks should do their own time math about whether to re-run checks.
*/
/*
* Wait until timeout, or until somebody wakes us up.
*/
rc = WaitLatch(MyLatch, latchFlags, timeout);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
{
proc_exit(1);
}
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* check for changed configuration */
if (myDbData->userOid != GetSessionUserId())
{
/* return code of 1 requests worker restart */
proc_exit(1);
}
/*
* Could also add code checking whether extension still exists,
* but that'd complicate things a bit, because we'd have to delete
* the shared memory entry. There'd potentially be a race
* condition where the extension gets re-created, checking that
* this entry still exists, and it getting deleted just after.
* Doesn't seem worth catering for that.
*/
}
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
}
}
/*
* MaintenanceDaemonShmemSize computes how much shared memory is required.
*/
static size_t
MaintenanceDaemonShmemSize(void)
{
Size size = 0;
Size hashSize = 0;
size = add_size(size, sizeof(MaintenanceDaemonControlData));
/*
* We request enough shared memory to have one hash-table entry for each
* worker process. We couldn't start more anyway, so there's little point
* in allocating more.
*/
hashSize = hash_estimate_size(max_worker_processes, sizeof(MaintenanceDaemonDBData));
size = add_size(size, hashSize);
return size;
}
/*
* MaintenanceDaemonShmemInit initializes the requested shared memory for the
* maintenance daemon.
*/
static void
MaintenanceDaemonShmemInit(void)
{
bool alreadyInitialized = false;
HASHCTL hashInfo;
int hashFlags = 0;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
MaintenanceDaemonControl =
(MaintenanceDaemonControlData *) ShmemInitStruct("Citus Maintenance Daemon",
MaintenanceDaemonShmemSize(),
&alreadyInitialized);
/*
* Might already be initialized on EXEC_BACKEND type platforms that call
* shared library initialization functions in every backend.
*/
if (!alreadyInitialized)
{
/* initialize lwlock */
LWLockTranche *tranche = &MaintenanceDaemonControl->lockTranche;
/* start by zeroing out all the memory */
memset(MaintenanceDaemonControl, 0, MaintenanceDaemonShmemSize());
/* initialize lock */
MaintenanceDaemonControl->trancheId = LWLockNewTrancheId();
tranche->array_base = &MaintenanceDaemonControl->lock;
tranche->array_stride = sizeof(LWLock);
tranche->name = "Citus Maintenance Daemon";
LWLockRegisterTranche(MaintenanceDaemonControl->trancheId, tranche);
LWLockInitialize(&MaintenanceDaemonControl->lock,
MaintenanceDaemonControl->trancheId);
}
memset(&hashInfo, 0, sizeof(hashInfo));
hashInfo.keysize = sizeof(Oid);
hashInfo.entrysize = sizeof(MaintenanceDaemonDBData);
hashInfo.hash = tag_hash;
hashFlags = (HASH_ELEM | HASH_FUNCTION);
MaintenanceDaemonControl->dbHash =
ShmemInitHash("Maintenance Database Hash",
max_worker_processes, max_worker_processes,
&hashInfo, hashFlags);
LWLockRelease(AddinShmemInitLock);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* MaintenanceDaemonSigHupHandler set a flag to re-read config file at next
* convenient time.
*/
static void
MaintenanceDaemonSigHupHandler(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGHUP = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}
errno = save_errno;
}
/*
* MaintenanceDaemonErrorContext adds some context to log messages to make it
* easier to associate them with the maintenance daemon.
*/
static void
MaintenanceDaemonErrorContext(void *arg)
{
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) arg;
errcontext("Citus maintenance daemon for database %u user %u",
myDbData->databaseOid, myDbData->userOid);
}

View File

@ -35,6 +35,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
#include "distributed/shared_library_init.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -81,27 +82,39 @@ typedef struct ShardCacheEntry
} ShardCacheEntry;
/* state which should be cleared upon DROP EXTENSION */
static bool extensionLoaded = false;
static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid;
static Oid distColocationRelationId = InvalidOid;
static Oid distColocationConfigurationIndexId = InvalidOid;
static Oid distColocationColocationidIndexId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid;
static Oid distShardLogicalRelidIndexId = InvalidOid;
static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid;
static Oid distShardPlacementPlacementidIndexId = InvalidOid;
static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid distTransactionRelationId = InvalidOid;
static Oid distTransactionGroupIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid;
static Oid workerHashFunctionId = InvalidOid;
/*
* State which should be cleared upon DROP EXTENSION. When the configuration
* changes, e.g. because the extension is dropped, these summarily get set to
* 0.
*/
typedef struct MetadataCacheData
{
bool extensionLoaded;
Oid distShardRelationId;
Oid distShardPlacementRelationId;
Oid distNodeRelationId;
Oid distLocalGroupRelationId;
Oid distColocationRelationId;
Oid distColocationConfigurationIndexId;
Oid distColocationColocationidIndexId;
Oid distPartitionRelationId;
Oid distPartitionLogicalRelidIndexId;
Oid distPartitionColocationidIndexId;
Oid distShardLogicalRelidIndexId;
Oid distShardShardidIndexId;
Oid distShardPlacementShardidIndexId;
Oid distShardPlacementPlacementidIndexId;
Oid distShardPlacementNodeidIndexId;
Oid distTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid extraDataContainerFuncId;
Oid workerHashFunctionId;
Oid extensionOwner;
} MetadataCacheData;
static MetadataCacheData MetadataCache;
/* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */
@ -118,8 +131,6 @@ static HTAB *DistShardCacheHash = NULL;
static HTAB *WorkerNodeHash = NULL;
static bool workerNodeHashValid = false;
static bool invalidationRegistered = false;
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
static int LocalGroupId = -1;
@ -148,8 +159,11 @@ static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
FmgrInfo *shardIntervalSortCompareFunction);
static void InitializeCaches(void);
static void InitializeDistTableCache(void);
static void InitializeWorkerNodeCache(void);
static void RegisterWorkerNodeCacheCallbacks(void);
static void RegisterLocalGroupIdCacheCallbacks(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
@ -401,11 +415,7 @@ LookupShardCacheEntry(int64 shardId)
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* probably not reachable */
if (DistShardCacheHash == NULL)
{
InitializeDistTableCache();
}
InitializeCaches();
/* lookup cache entry */
shardEntry = hash_search(DistShardCacheHash, &shardId, HASH_FIND, &foundInCache);
@ -511,10 +521,7 @@ LookupDistTableCacheEntry(Oid relationId)
return NULL;
}
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
}
InitializeCaches();
/*
* If the version is not known to be compatible, perform thorough check,
@ -1094,7 +1101,7 @@ bool
CitusHasBeenLoaded(void)
{
/* recheck presence until citus has been loaded */
if (!extensionLoaded || creating_extension)
if (!MetadataCache.extensionLoaded || creating_extension)
{
bool extensionPresent = false;
bool extensionScriptExecuted = true;
@ -1112,12 +1119,21 @@ CitusHasBeenLoaded(void)
{
extensionScriptExecuted = false;
}
/*
* Whenever the extension exists, even when currently creating it,
* we need the infrastructure to run citus in this database to be
* ready.
*/
StartupCitusBackend();
}
/* we disable extension features during pg_upgrade */
extensionLoaded = extensionPresent && extensionScriptExecuted && !IsBinaryUpgrade;
MetadataCache.extensionLoaded = extensionPresent &&
extensionScriptExecuted &&
!IsBinaryUpgrade;
if (extensionLoaded)
if (MetadataCache.extensionLoaded)
{
/*
* InvalidateDistRelationCacheCallback resets state such as extensionLoaded
@ -1140,7 +1156,7 @@ CitusHasBeenLoaded(void)
}
}
return extensionLoaded;
return MetadataCache.extensionLoaded;
}
@ -1302,6 +1318,8 @@ AvailableExtensionVersion(void)
bool doCopy = false;
char *availableExtensionVersion;
InitializeCaches();
estate = CreateExecutorState();
extensionsResultSet = makeNode(ReturnSetInfo);
extensionsResultSet->econtext = GetPerTupleExprContext(estate);
@ -1334,10 +1352,6 @@ AvailableExtensionVersion(void)
Datum availableVersion = slot_getattr(tupleTableSlot, 2, &isNull);
/* we will cache the result of citus version to prevent catalog access */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
availableExtensionVersion = text_to_cstring(DatumGetTextPP(availableVersion));
@ -1404,11 +1418,6 @@ InstalledExtensionVersion(void)
}
/* we will cache the result of citus version to prevent catalog access */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
oldMemoryContext = MemoryContextSwitchTo(CacheMemoryContext);
installedExtensionVersion = text_to_cstring(DatumGetTextPP(installedVersion));
@ -1433,9 +1442,10 @@ InstalledExtensionVersion(void)
Oid
DistShardRelationId(void)
{
CachedRelationLookup("pg_dist_shard", &distShardRelationId);
CachedRelationLookup("pg_dist_shard",
&MetadataCache.distShardRelationId);
return distShardRelationId;
return MetadataCache.distShardRelationId;
}
@ -1443,9 +1453,10 @@ DistShardRelationId(void)
Oid
DistShardPlacementRelationId(void)
{
CachedRelationLookup("pg_dist_shard_placement", &distShardPlacementRelationId);
CachedRelationLookup("pg_dist_shard_placement",
&MetadataCache.distShardPlacementRelationId);
return distShardPlacementRelationId;
return MetadataCache.distShardPlacementRelationId;
}
@ -1453,9 +1464,10 @@ DistShardPlacementRelationId(void)
Oid
DistNodeRelationId(void)
{
CachedRelationLookup("pg_dist_node", &distNodeRelationId);
CachedRelationLookup("pg_dist_node",
&MetadataCache.distNodeRelationId);
return distNodeRelationId;
return MetadataCache.distNodeRelationId;
}
@ -1463,9 +1475,10 @@ DistNodeRelationId(void)
Oid
DistLocalGroupIdRelationId(void)
{
CachedRelationLookup("pg_dist_local_group", &distLocalGroupRelationId);
CachedRelationLookup("pg_dist_local_group",
&MetadataCache.distLocalGroupRelationId);
return distLocalGroupRelationId;
return MetadataCache.distLocalGroupRelationId;
}
@ -1473,9 +1486,10 @@ DistLocalGroupIdRelationId(void)
Oid
DistColocationRelationId(void)
{
CachedRelationLookup("pg_dist_colocation", &distColocationRelationId);
CachedRelationLookup("pg_dist_colocation",
&MetadataCache.distColocationRelationId);
return distColocationRelationId;
return MetadataCache.distColocationRelationId;
}
@ -1484,9 +1498,9 @@ Oid
DistColocationConfigurationIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_configuration_index",
&distColocationConfigurationIndexId);
&MetadataCache.distColocationConfigurationIndexId);
return distColocationConfigurationIndexId;
return MetadataCache.distColocationConfigurationIndexId;
}
@ -1495,9 +1509,9 @@ Oid
DistColocationColocationidIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_pkey",
&distColocationColocationidIndexId);
&MetadataCache.distColocationColocationidIndexId);
return distColocationColocationidIndexId;
return MetadataCache.distColocationColocationidIndexId;
}
@ -1505,9 +1519,10 @@ DistColocationColocationidIndexId(void)
Oid
DistPartitionRelationId(void)
{
CachedRelationLookup("pg_dist_partition", &distPartitionRelationId);
CachedRelationLookup("pg_dist_partition",
&MetadataCache.distPartitionRelationId);
return distPartitionRelationId;
return MetadataCache.distPartitionRelationId;
}
@ -1516,9 +1531,9 @@ Oid
DistPartitionLogicalRelidIndexId(void)
{
CachedRelationLookup("pg_dist_partition_logical_relid_index",
&distPartitionLogicalRelidIndexId);
&MetadataCache.distPartitionLogicalRelidIndexId);
return distPartitionLogicalRelidIndexId;
return MetadataCache.distPartitionLogicalRelidIndexId;
}
@ -1527,9 +1542,9 @@ Oid
DistPartitionColocationidIndexId(void)
{
CachedRelationLookup("pg_dist_partition_colocationid_index",
&distPartitionColocationidIndexId);
&MetadataCache.distPartitionColocationidIndexId);
return distPartitionColocationidIndexId;
return MetadataCache.distPartitionColocationidIndexId;
}
@ -1538,9 +1553,9 @@ Oid
DistShardLogicalRelidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_logical_relid_index",
&distShardLogicalRelidIndexId);
&MetadataCache.distShardLogicalRelidIndexId);
return distShardLogicalRelidIndexId;
return MetadataCache.distShardLogicalRelidIndexId;
}
@ -1548,9 +1563,10 @@ DistShardLogicalRelidIndexId(void)
Oid
DistShardShardidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_shardid_index", &distShardShardidIndexId);
CachedRelationLookup("pg_dist_shard_shardid_index",
&MetadataCache.distShardShardidIndexId);
return distShardShardidIndexId;
return MetadataCache.distShardShardidIndexId;
}
@ -1559,9 +1575,9 @@ Oid
DistShardPlacementShardidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_shardid_index",
&distShardPlacementShardidIndexId);
&MetadataCache.distShardPlacementShardidIndexId);
return distShardPlacementShardidIndexId;
return MetadataCache.distShardPlacementShardidIndexId;
}
@ -1570,9 +1586,9 @@ Oid
DistShardPlacementPlacementidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_placementid_index",
&distShardPlacementPlacementidIndexId);
&MetadataCache.distShardPlacementPlacementidIndexId);
return distShardPlacementPlacementidIndexId;
return MetadataCache.distShardPlacementPlacementidIndexId;
}
@ -1580,9 +1596,10 @@ DistShardPlacementPlacementidIndexId(void)
Oid
DistTransactionRelationId(void)
{
CachedRelationLookup("pg_dist_transaction", &distTransactionRelationId);
CachedRelationLookup("pg_dist_transaction",
&MetadataCache.distTransactionRelationId);
return distTransactionRelationId;
return MetadataCache.distTransactionRelationId;
}
@ -1591,9 +1608,9 @@ Oid
DistTransactionGroupIndexId(void)
{
CachedRelationLookup("pg_dist_transaction_group_index",
&distTransactionGroupIndexId);
&MetadataCache.distTransactionGroupIndexId);
return distTransactionGroupIndexId;
return MetadataCache.distTransactionGroupIndexId;
}
@ -1602,9 +1619,9 @@ Oid
DistShardPlacementNodeidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_nodeid_index",
&distShardPlacementNodeidIndexId);
&MetadataCache.distShardPlacementNodeidIndexId);
return distShardPlacementNodeidIndexId;
return MetadataCache.distShardPlacementNodeidIndexId;
}
@ -1615,14 +1632,15 @@ CitusExtraDataContainerFuncId(void)
List *nameList = NIL;
Oid paramOids[1] = { INTERNALOID };
if (extraDataContainerFuncId == InvalidOid)
if (MetadataCache.extraDataContainerFuncId == InvalidOid)
{
nameList = list_make2(makeString("pg_catalog"),
makeString("citus_extradata_container"));
extraDataContainerFuncId = LookupFuncName(nameList, 1, paramOids, false);
MetadataCache.extraDataContainerFuncId =
LookupFuncName(nameList, 1, paramOids, false);
}
return extraDataContainerFuncId;
return MetadataCache.extraDataContainerFuncId;
}
@ -1630,17 +1648,18 @@ CitusExtraDataContainerFuncId(void)
Oid
CitusWorkerHashFunctionId(void)
{
if (workerHashFunctionId == InvalidOid)
if (MetadataCache.workerHashFunctionId == InvalidOid)
{
Oid citusExtensionOid = get_extension_oid("citus", false);
Oid citusSchemaOid = get_extension_schema(citusExtensionOid);
char *citusSchemaName = get_namespace_name(citusSchemaOid);
const int argCount = 1;
workerHashFunctionId = FunctionOid(citusSchemaName, "worker_hash", argCount);
MetadataCache.workerHashFunctionId =
FunctionOid(citusSchemaName, "worker_hash", argCount);
}
return workerHashFunctionId;
return MetadataCache.workerHashFunctionId;
}
@ -1657,11 +1676,10 @@ CitusExtensionOwner(void)
ScanKeyData entry[1];
HeapTuple extensionTuple = NULL;
Form_pg_extension extensionForm = NULL;
static Oid extensionOwner = InvalidOid;
if (extensionOwner != InvalidOid)
if (MetadataCache.extensionOwner != InvalidOid)
{
return extensionOwner;
return MetadataCache.extensionOwner;
}
relation = heap_open(ExtensionRelationId, AccessShareLock);
@ -1693,8 +1711,8 @@ CitusExtensionOwner(void)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("citus extension needs to be owned by superuser")));
}
extensionOwner = extensionForm->extowner;
Assert(OidIsValid(extensionOwner));
MetadataCache.extensionOwner = extensionForm->extowner;
Assert(OidIsValid(MetadataCache.extensionOwner));
}
else
{
@ -1706,7 +1724,7 @@ CitusExtensionOwner(void)
heap_close(relation, AccessShareLock);
return extensionOwner;
return MetadataCache.extensionOwner;
}
@ -1972,18 +1990,39 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
}
/*
* InitializeCaches() registers invalidation handlers for metadata_cache.c's
* caches.
*/
static void
InitializeCaches(void)
{
static bool performedInitialization = false;
if (!performedInitialization)
{
/* set first, to avoid recursion dangers */
performedInitialization = true;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
InitializeDistTableCache();
RegisterWorkerNodeCacheCallbacks();
RegisterLocalGroupIdCacheCallbacks();
}
}
/* initialize the infrastructure for the metadata cache */
static void
InitializeDistTableCache(void)
{
HASHCTL info;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
/* build initial scan keys, copied for every relation scan */
memset(&DistPartitionScanKey, 0, sizeof(DistPartitionScanKey));
@ -2037,6 +2076,8 @@ InitializeDistTableCache(void)
HTAB *
GetWorkerNodeHash(void)
{
InitializeCaches(); /* ensure relevant callbacks are registered */
/*
* We might have some concurrent metadata changes. In order to get the changes,
* we first need to accept the cache invalidation messages.
@ -2062,7 +2103,6 @@ GetWorkerNodeHash(void)
static void
InitializeWorkerNodeCache(void)
{
static bool invalidationRegistered = false;
HTAB *oldWorkerNodeHash = NULL;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
@ -2070,11 +2110,7 @@ InitializeWorkerNodeCache(void)
int hashFlags = 0;
long maxTableSize = (long) MaxWorkerNodesTracked;
/* make sure we've initialized CacheMemoryContext */
if (CacheMemoryContext == NULL)
{
CreateCacheMemoryContext();
}
InitializeCaches();
/*
* Create the hash that holds the worker nodes. The key is the combination of
@ -2132,16 +2168,20 @@ InitializeWorkerNodeCache(void)
/* now, safe to destroy the old hash */
hash_destroy(oldWorkerNodeHash);
}
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback,
(Datum) 0);
invalidationRegistered = true;
}
/*
* RegisterWorkerNodeCacheCallbacks registers the callbacks required for the
* worker node cache. It's separate from InitializeWorkerNodeCache so the
* callback can be registered early, before the metadata tables exist.
*/
static void
RegisterWorkerNodeCacheCallbacks(void)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateNodeRelationCacheCallback,
(Datum) 0);
}
@ -2162,6 +2202,8 @@ GetLocalGroupId(void)
Relation pgDistLocalGroupId = NULL;
Oid localGroupTableOid = InvalidOid;
InitializeCaches();
/*
* Already set the group id, no need to read the heap again.
*/
@ -2203,16 +2245,6 @@ GetLocalGroupId(void)
systable_endscan(scanDescriptor);
heap_close(pgDistLocalGroupId, AccessShareLock);
/* prevent multiple invalidation registrations */
if (!invalidationRegistered)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
(Datum) 0);
invalidationRegistered = true;
}
/* set the local cache variable */
LocalGroupId = groupId;
@ -2220,6 +2252,21 @@ GetLocalGroupId(void)
}
/*
* RegisterLocalGroupIdCacheCallbacks registers the callbacks required to
* maintain LocalGroupId at a consistent value. It's separate from
* GetLocalGroupId so the callback can be registered early, before metadata
* tables exist.
*/
static void
RegisterLocalGroupIdCacheCallbacks(void)
{
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(InvalidateLocalGroupIdRelationCacheCallback,
(Datum) 0);
}
/*
* WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations
@ -2382,27 +2429,9 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
* This happens pretty rarely, but most importantly happens during
* DROP EXTENSION citus;
*/
if (relationId != InvalidOid && relationId == distPartitionRelationId)
if (relationId != InvalidOid && relationId == MetadataCache.distPartitionRelationId)
{
extensionLoaded = false;
distShardRelationId = InvalidOid;
distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid;
distNodeRelationId = InvalidOid;
distColocationRelationId = InvalidOid;
distColocationConfigurationIndexId = InvalidOid;
distColocationColocationidIndexId = InvalidOid;
distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid;
distShardLogicalRelidIndexId = InvalidOid;
distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid;
distShardPlacementPlacementidIndexId = InvalidOid;
distTransactionRelationId = InvalidOid;
distTransactionGroupIndexId = InvalidOid;
extraDataContainerFuncId = InvalidOid;
workerHashFunctionId = InvalidOid;
memset(&MetadataCache, 0, sizeof(MetadataCache));
}
}
@ -2461,7 +2490,7 @@ DistTableOidList(void)
static void
InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId)
{
if (relationId == InvalidOid || relationId == distNodeRelationId)
if (relationId == InvalidOid || relationId == MetadataCache.distNodeRelationId)
{
workerNodeHashValid = false;
}
@ -2476,7 +2505,7 @@ static void
InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId)
{
/* when invalidation happens simply set the LocalGroupId to the default value */
if (relationId == InvalidOid || relationId == distLocalGroupRelationId)
if (relationId == InvalidOid || relationId == MetadataCache.distLocalGroupRelationId)
{
LocalGroupId = -1;
}
@ -2734,6 +2763,9 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
static void
CachedRelationLookup(const char *relationName, Oid *cachedOid)
{
/* force callbacks to be registered, so we always get notified upon changes */
InitializeCaches();
if (*cachedOid == InvalidOid)
{
*cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE);

View File

@ -0,0 +1,20 @@
/*-------------------------------------------------------------------------
*
* maintenanced.h
* Background worker run for each citus using database in a postgres
* cluster.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MAINTENANCED_H
#define MAINTENANCED_H
extern void InitializeMaintenanceDaemon(void);
extern void InitializeMaintenanceDaemonBackend(void);
extern void CitusMaintenanceDaemonMain(Datum main_arg);
#endif /* MAINTENANCED_H */

View File

@ -0,0 +1,16 @@
/*-------------------------------------------------------------------------
*
* shared_library_init.h
* Functionality related to the initialization of the Citus extension.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARED_LIBRARY_INIT_H
#define SHARED_LIBRARY_INIT_H
extern void StartupCitusBackend(void);
#endif /* SHARED_LIBRARY_INIT_H */

View File

@ -7,6 +7,36 @@
-- not done yet.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 580000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 580000;
CREATE SCHEMA test;
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
-- check maintenance daemon is started
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | ?column? | ?column?
------------+----------+----------
regression | t | t
(1 row)
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,
@ -15,7 +45,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus');
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
count
-------
0
@ -96,7 +126,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus');
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
count
-------
0
@ -119,7 +149,6 @@ CREATE TABLE version_mismatch_table(column1 int);
\copy version_mismatch_table FROM STDIN;
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
column1
@ -177,13 +206,6 @@ DROP EXTENSION citus;
CREATE EXTENSION citus;
-- test cache invalidation in workers
\c - - - :worker_1_port
-- this will initialize the cache
\d
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '5.2-4';
@ -197,3 +219,70 @@ ALTER EXTENSION citus UPDATE;
--------+------+------+-------
(0 rows)
\c - - - :master_port
-- check that maintenance daemon gets (re-)started for the right user
DROP EXTENSION citus;
CREATE USER testuser SUPERUSER;
SET ROLE testuser;
CREATE EXTENSION citus;
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | ?column? | ?column?
------------+----------+----------
regression | t | t
(1 row)
-- and recreate as the right owner
RESET ROLE;
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- Check that maintenance daemon can also be started in another database
CREATE DATABASE another;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c another
CREATE EXTENSION citus;
CREATE SCHEMA test;
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
datname | ?column? | ?column?
---------+----------+----------
another | t | t
(1 row)
-- Test that database with active worker can be dropped. That'll
-- require killing the maintenance worker.
\c regression
SELECT datname,
pg_terminate_backend(pid)
FROM test.maintenance_worker('another');
datname | pg_terminate_backend
---------+----------------------
another | t
(1 row)
DROP DATABASE another;

View File

@ -10,6 +10,34 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 580000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 580000;
CREATE SCHEMA test;
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
-- check maintenance daemon is started
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,
@ -18,7 +46,8 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus');
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
-- DROP EXTENSION pre-created by the regression suite
DROP EXTENSION citus;
@ -94,7 +123,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus');
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
-- see incompatible version errors out
RESET citus.enable_version_checks;
@ -120,7 +149,7 @@ CREATE TABLE version_mismatch_table(column1 int);
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
@ -164,8 +193,6 @@ CREATE EXTENSION citus;
-- test cache invalidation in workers
\c - - - :worker_1_port
-- this will initialize the cache
\d
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '5.2-4';
@ -175,3 +202,62 @@ ALTER EXTENSION citus UPDATE;
-- if cache is invalidated succesfull, this \d should work without any problem
\d
\c - - - :master_port
-- check that maintenance daemon gets (re-)started for the right user
DROP EXTENSION citus;
CREATE USER testuser SUPERUSER;
SET ROLE testuser;
CREATE EXTENSION citus;
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
-- and recreate as the right owner
RESET ROLE;
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- Check that maintenance daemon can also be started in another database
CREATE DATABASE another;
\c another
CREATE EXTENSION citus;
CREATE SCHEMA test;
CREATE OR REPLACE FUNCTION test.maintenance_worker(p_dbname text DEFAULT current_database())
RETURNS pg_stat_activity
LANGUAGE plpgsql
AS $$
DECLARE
activity record;
BEGIN
LOOP
SELECT * INTO activity FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' AND datname = p_dbname;
IF activity.pid IS NOT NULL THEN
RETURN activity;
ELSE
PERFORM pg_sleep(0.1);
PERFORM pg_stat_clear_snapshot();
END IF ;
END LOOP;
END;
$$;
SELECT datname,
datname = current_database(),
usename = (SELECT extowner::regrole::text FROM pg_extension WHERE extname = 'citus')
FROM test.maintenance_worker();
-- Test that database with active worker can be dropped. That'll
-- require killing the maintenance worker.
\c regression
SELECT datname,
pg_terminate_backend(pid)
FROM test.maintenance_worker('another');
DROP DATABASE another;