mirror of https://github.com/citusdata/citus.git
Improve the performance of CitusHasBeenLoaded function for a database that does not do CREATE EXTENSION citus but load citus.so. (#7123)
For a database that does not create the citus extension by running ` CREATE EXTENSION citus;` `CitusHasBeenLoaded ` function ends up querying the `pg_extension` table every time it is invoked. This is not an ideal situation for a such a database. The idea in this PR is as follows: ### A new field in MetadataCache. Add a new variable `extensionCreatedState `of the following type: ``` typedef enum ExtensionCreatedState { UNKNOWN = 0, CREATED = 1, NOTCREATED = 2, } ExtensionCreatedState; ``` When the MetadataCache is invalidated, `ExtensionCreatedState` will be set to UNKNOWN. ### Invalidate MetadataCache when CREATE/DROP/ALTER EXTENSION citus commands are run. - Register a callback function, named `InvalidateDistRelationCacheCallback`, for relcache invalidation during the shared library initialization for `citus.so`. This callback function is invoked in all the backends whenever the relcache is invalidated in one of the backends. (This could be caused many DDLs operations). - In the cache invalidation callback,` InvalidateDistRelationCacheCallback`, invalidate `MetadataCache` zeroing it out. - In `CitusHasBeenLoaded`, perform the costly citus is loaded check only if the `MetadataCache` is not valid. ### Downsides Any relcache invalidation (caused by various DDL operations) will case Citus MetadataCache to get invalidated. Most of the time it will be unnecessary. But we rely on that DDL operations on relations will not be too frequent.pull/7173/head
parent
1d540b60fb
commit
a849570f3f
|
@ -77,6 +77,7 @@
|
|||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
|
||||
bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
|
||||
parsetree);
|
||||
|
||||
if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
|
||||
{
|
||||
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
|
||||
|
@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
PreprocessCreateExtensionStmtForCitusColumnar(parsetree);
|
||||
}
|
||||
|
||||
if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree))
|
||||
{
|
||||
/*
|
||||
* Citus maintains a higher level cache. We use the cache invalidation mechanism
|
||||
* of Postgres to achieve cache coherency between backends. Any change to citus
|
||||
* extension should be made known to other backends. We do this by invalidating the
|
||||
* relcache and therefore invoking the citus registered callback that invalidates
|
||||
* the citus cache in other backends.
|
||||
*/
|
||||
CacheInvalidateRelcacheAll();
|
||||
}
|
||||
|
||||
/*
|
||||
* Make sure that on DROP DATABASE we terminate the background daemon
|
||||
* associated with it.
|
||||
|
@ -926,15 +940,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
|
||||
{
|
||||
/*
|
||||
* Ensure value is valid, we can't do some checks during CREATE
|
||||
* EXTENSION. This is important to register some invalidation callbacks.
|
||||
*/
|
||||
CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -133,6 +133,19 @@ typedef struct ShardIdCacheEntry
|
|||
int shardIndex;
|
||||
} ShardIdCacheEntry;
|
||||
|
||||
/*
|
||||
* ExtensionCreatedState is used to track if citus extension has been created
|
||||
* using CREATE EXTENSION command.
|
||||
* UNKNOWN : MetadataCache is invalid. State is UNKNOWN.
|
||||
* CREATED : Citus is created.
|
||||
* NOTCREATED : Citus is not created.
|
||||
*/
|
||||
typedef enum ExtensionCreatedState
|
||||
{
|
||||
UNKNOWN = 0,
|
||||
CREATED = 1,
|
||||
NOTCREATED = 2,
|
||||
} ExtensionCreatedState;
|
||||
|
||||
/*
|
||||
* State which should be cleared upon DROP EXTENSION. When the configuration
|
||||
|
@ -140,7 +153,7 @@ typedef struct ShardIdCacheEntry
|
|||
*/
|
||||
typedef struct MetadataCacheData
|
||||
{
|
||||
bool extensionLoaded;
|
||||
ExtensionCreatedState extensionCreatedState;
|
||||
Oid distShardRelationId;
|
||||
Oid distPlacementRelationId;
|
||||
Oid distBackgroundJobRelationId;
|
||||
|
@ -288,7 +301,6 @@ static void CreateDistTableCache(void);
|
|||
static void CreateShardIdCache(void);
|
||||
static void CreateDistObjectCache(void);
|
||||
static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId);
|
||||
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
|
||||
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
|
||||
static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId);
|
||||
static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId);
|
||||
|
@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
|||
bool
|
||||
CitusHasBeenLoaded(void)
|
||||
{
|
||||
if (!MetadataCache.extensionLoaded || creating_extension)
|
||||
/*
|
||||
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
|
||||
* since the objects used by the C code might be not be there yet.
|
||||
*/
|
||||
if (creating_extension)
|
||||
{
|
||||
/*
|
||||
* Refresh if we have not determined whether the extension has been
|
||||
* loaded yet, or in case of ALTER EXTENSION since we want to treat
|
||||
* Citus as "not loaded" during ALTER EXTENSION citus.
|
||||
*/
|
||||
bool extensionLoaded = CitusHasBeenLoadedInternal();
|
||||
Oid citusExtensionOid = get_extension_oid("citus", true);
|
||||
|
||||
if (extensionLoaded && !MetadataCache.extensionLoaded)
|
||||
if (CurrentExtensionObject == citusExtensionOid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If extensionCreatedState is UNKNOWN, query pg_extension for Citus
|
||||
* and cache the result. Otherwise return the value extensionCreatedState
|
||||
* indicates.
|
||||
*/
|
||||
if (MetadataCache.extensionCreatedState == UNKNOWN)
|
||||
{
|
||||
bool extensionCreated = CitusHasBeenLoadedInternal();
|
||||
|
||||
if (extensionCreated)
|
||||
{
|
||||
/*
|
||||
* Loaded Citus for the first time in this session, or first time after
|
||||
|
@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void)
|
|||
*/
|
||||
StartupCitusBackend();
|
||||
|
||||
/*
|
||||
* InvalidateDistRelationCacheCallback resets state such as extensionLoaded
|
||||
* when it notices changes to pg_dist_partition (which usually indicate
|
||||
* `DROP EXTENSION citus;` has been run)
|
||||
*
|
||||
* Ensure InvalidateDistRelationCacheCallback will notice those changes
|
||||
* by caching pg_dist_partition's oid.
|
||||
*
|
||||
* We skip these checks during upgrade since pg_dist_partition is not
|
||||
* present during early stages of upgrade operation.
|
||||
*/
|
||||
DistPartitionRelationId();
|
||||
|
||||
/*
|
||||
* This needs to be initialized so we can receive foreign relation graph
|
||||
* invalidation messages in InvalidateForeignRelationGraphCacheCallback().
|
||||
* See the comments of InvalidateForeignKeyGraph for more context.
|
||||
*/
|
||||
DistColocationRelationId();
|
||||
}
|
||||
|
||||
MetadataCache.extensionLoaded = extensionLoaded;
|
||||
MetadataCache.extensionCreatedState = CREATED;
|
||||
}
|
||||
else
|
||||
{
|
||||
MetadataCache.extensionCreatedState = NOTCREATED;
|
||||
}
|
||||
}
|
||||
|
||||
return MetadataCache.extensionLoaded;
|
||||
return (MetadataCache.extensionCreatedState == CREATED) ? true : false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void)
|
|||
return false;
|
||||
}
|
||||
|
||||
if (creating_extension && CurrentExtensionObject == citusExtensionOid)
|
||||
{
|
||||
/*
|
||||
* We do not use Citus hooks during CREATE/ALTER EXTENSION citus
|
||||
* since the objects used by the C code might be not be there yet.
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
/* citus extension exists and has been created */
|
||||
return true;
|
||||
}
|
||||
|
@ -4201,10 +4209,6 @@ InitializeDistCache(void)
|
|||
CreateShardIdCache();
|
||||
|
||||
InitializeDistObjectCache();
|
||||
|
||||
/* Watch for invalidation events. */
|
||||
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
|
||||
(Datum) 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -4754,7 +4758,7 @@ InvalidateForeignKeyGraph(void)
|
|||
* InvalidateDistRelationCacheCallback flushes cache entries when a relation
|
||||
* is updated (or flushes the entire cache).
|
||||
*/
|
||||
static void
|
||||
void
|
||||
InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
|
||||
{
|
||||
/* invalidate either entire cache or a specific entry */
|
||||
|
@ -4762,12 +4766,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
|
|||
{
|
||||
InvalidateDistTableCache();
|
||||
InvalidateDistObjectCache();
|
||||
InvalidateMetadataSystemCache();
|
||||
}
|
||||
else
|
||||
{
|
||||
void *hashKey = (void *) &relationId;
|
||||
bool foundInCache = false;
|
||||
|
||||
if (DistTableCacheHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
CitusTableCacheEntrySlot *cacheSlot =
|
||||
hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache);
|
||||
if (foundInCache)
|
||||
|
@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
|
|||
}
|
||||
|
||||
/*
|
||||
* If pg_dist_partition is being invalidated drop all state
|
||||
* This happens pretty rarely, but most importantly happens during
|
||||
* DROP EXTENSION citus; This isn't the only time when this happens
|
||||
* though, it can happen for multiple other reasons, such as an
|
||||
* autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE
|
||||
* wouldn't really need a full Metadata cache invalidation, but we
|
||||
* don't know how to differentiate between DROP EXTENSION and ANALYZE.
|
||||
* So for now we simply drop it in both cases and take the slight
|
||||
* temporary performance hit.
|
||||
* if pg_dist_partition relcache is invalidated for some reason,
|
||||
* invalidate the MetadataCache. It is likely an overkill to invalidate
|
||||
* the entire cache here. But until a better fix, we keep it this way
|
||||
* for postgres regression tests that includes
|
||||
* REINDEX SCHEMA CONCURRENTLY pg_catalog
|
||||
* command.
|
||||
*/
|
||||
if (relationId == MetadataCache.distPartitionRelationId)
|
||||
{
|
||||
InvalidateMetadataSystemCache();
|
||||
}
|
||||
|
||||
|
||||
if (relationId == MetadataCache.distObjectRelationId)
|
||||
{
|
||||
InvalidateDistObjectCache();
|
||||
|
@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void)
|
|||
CitusTableCacheEntrySlot *cacheSlot = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
if (DistTableCacheHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
hash_seq_init(&status, DistTableCacheHash);
|
||||
|
||||
while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL)
|
||||
|
@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void)
|
|||
DistObjectCacheEntry *cacheEntry = NULL;
|
||||
HASH_SEQ_STATUS status;
|
||||
|
||||
if (DistObjectCacheHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
hash_seq_init(&status, DistObjectCacheHash);
|
||||
|
||||
while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL)
|
||||
|
@ -4930,8 +4948,8 @@ CreateDistObjectCache(void)
|
|||
|
||||
|
||||
/*
|
||||
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
|
||||
* and invalidates the worker node, ConnParams, and local group ID caches.
|
||||
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState
|
||||
* flag and invalidates the worker node, ConnParams, and local group ID caches.
|
||||
*/
|
||||
void
|
||||
InvalidateMetadataSystemCache(void)
|
||||
|
|
|
@ -109,6 +109,8 @@
|
|||
#include "tcop/tcopprot.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/guc_tables.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/varlena.h"
|
||||
|
||||
|
@ -554,6 +556,9 @@ _PG_init(void)
|
|||
"ColumnarSupportsIndexAM",
|
||||
true, &handle);
|
||||
|
||||
CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback,
|
||||
(Datum) 0);
|
||||
|
||||
INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr);
|
||||
INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable);
|
||||
INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions);
|
||||
|
|
|
@ -137,6 +137,8 @@ typedef enum
|
|||
ANY_CITUS_TABLE_TYPE
|
||||
} CitusTableType;
|
||||
|
||||
void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
|
||||
|
||||
extern List * AllCitusTableIds(void);
|
||||
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
|
||||
extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry);
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
import psycopg
|
||||
import pytest
|
||||
|
||||
|
||||
def test_create_drop_citus(coord):
|
||||
with coord.cur() as cur1:
|
||||
with coord.cur() as cur2:
|
||||
# Conn1 drops the extension
|
||||
# and Conn2 cannot use it.
|
||||
cur1.execute("DROP EXTENSION citus")
|
||||
|
||||
with pytest.raises(psycopg.errors.UndefinedFunction):
|
||||
# Conn1 dropped the extension. citus_version udf
|
||||
# cannot be found.sycopg.errors.UndefinedFunction
|
||||
# is expected here.
|
||||
cur2.execute("SELECT citus_version();")
|
||||
|
||||
# Conn2 creates the extension,
|
||||
# Conn1 is able to use it immediadtely.
|
||||
cur2.execute("CREATE EXTENSION citus")
|
||||
cur1.execute("SELECT citus_version();")
|
||||
cur1.execute("DROP EXTENSION citus;")
|
||||
|
||||
with coord.cur() as cur1:
|
||||
with coord.cur() as cur2:
|
||||
# A connection is able to create and use the extension
|
||||
# within a transaction block.
|
||||
cur1.execute("BEGIN;")
|
||||
cur1.execute("CREATE TABLE t1(id int);")
|
||||
cur1.execute("CREATE EXTENSION citus;")
|
||||
cur1.execute("SELECT create_reference_table('t1')")
|
||||
cur1.execute("ABORT;")
|
||||
|
||||
# Conn1 aborted so Conn2 is be able to create and
|
||||
# use the extension within a transaction block.
|
||||
cur2.execute("BEGIN;")
|
||||
cur2.execute("CREATE TABLE t1(id int);")
|
||||
cur2.execute("CREATE EXTENSION citus;")
|
||||
cur2.execute("SELECT create_reference_table('t1')")
|
||||
cur2.execute("COMMIT;")
|
||||
|
||||
# Conn2 commited so Conn1 is be able to use the
|
||||
# extension immediately.
|
||||
cur1.execute("SELECT citus_version();")
|
Loading…
Reference in New Issue