Only error out on distributed queries when there is version mismatch

Before this commit, we were erroring out at almost all queries if there is a
version mismatch. With this commit, we started to error out only requested
operation touches distributed tables.

Normally we would need to use distributed cache to understand whether a table
is distributed or not. However, it is not safe to read our metadata tables when
there is a version mismatch, thus it is not safe to create distributed cache.
Therefore for this specific occasion, we directly read from pg_dist_partition
table. However; reading from catalog is costly and we should not use this
method in other places as much as possible.
pull/1425/head
Burak Yucesoy 2017-05-19 19:10:07 +03:00
parent acb0d23717
commit eea8c51e1f
3 changed files with 184 additions and 152 deletions

View File

@ -99,7 +99,6 @@ struct DropRelationCallbackState
/* Local functions forward declarations for deciding when to perform processing/checks */ /* Local functions forward declarations for deciding when to perform processing/checks */
static bool SkipCitusProcessingForUtility(Node *parsetree);
static bool IsCitusExtensionStmt(Node *parsetree); static bool IsCitusExtensionStmt(Node *parsetree);
/* Local functions forward declarations for Transmit statement */ /* Local functions forward declarations for Transmit statement */
@ -185,23 +184,29 @@ multi_ProcessUtility(Node *parsetree,
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
List *ddlJobs = NIL; List *ddlJobs = NIL;
bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree); bool checkExtensionVersion = false;
if (skipCitusProcessing) if (IsA(parsetree, TransactionStmt))
{ {
bool checkExtensionVersion = IsCitusExtensionStmt(parsetree); /*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
if (EnableVersionChecks && checkExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
return; return;
} }
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
if (EnableVersionChecks && checkExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
/* /*
@ -447,63 +452,6 @@ multi_ProcessUtility(Node *parsetree,
} }
/*
* SkipCitusProcessingForUtility simply returns whether a given utility should
* bypass Citus processing and checks and be handled exclusively by standard
* PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION,
* ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus.
*/
static bool
SkipCitusProcessingForUtility(Node *parsetree)
{
switch (parsetree->type)
{
/*
* In the CitusHasBeenLoaded check, we compare versions of loaded code,
* the installed extension, and available extension. If they differ, we
* force user to execute ALTER EXTENSION citus UPDATE. To allow this,
* CREATE/DROP/ALTER extension must be omitted from Citus processing.
*/
case T_DropStmt:
{
DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType != OBJECT_EXTENSION)
{
return false;
}
}
/* no break, fall through */
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
case T_TransactionStmt:
/*
* Skip processing of variable set statements, to allow changing the
* enable_version_checks GUC during testing.
*/
case T_VariableSetStmt:
{
return true;
}
default:
{
return false;
}
}
}
/* /*
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER * IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
* EXTENSION statement which references the citus extension. This function * EXTENSION statement which references the citus extension. This function
@ -1474,8 +1422,8 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
* from the citus.control file. In case a new default is available, we * from the citus.control file. In case a new default is available, we
* will force a compatibility check of the latest available version. * will force a compatibility check of the latest available version.
*/ */
availableExtensionVersion = NULL; citusVersionKnownCompatible = false;
ErrorIfAvailableVersionMismatch(); CheckCitusVersion(ERROR);
} }
} }

View File

@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid;
/* Citus extension version variables */ /* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */ bool EnableVersionChecks = true; /* version checks are enabled */
char *availableExtensionVersion = NULL; bool citusVersionKnownCompatible = false;
static char *installedExtensionVersion = NULL;
/* Hash table for informations about each partition */ /* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL; static HTAB *DistTableCacheHash = NULL;
@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1];
/* local function forward declarations */ /* local function forward declarations */
static bool IsDistributedTableViaCatalog(Oid relationId);
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
@ -142,7 +142,8 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength); int shardIntervalArrayLength);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount); int shardCount);
static void ErrorIfInstalledVersionMismatch(void); static bool CheckInstalledVersion(int elevel);
static bool CheckAvailableVersion(int elevel);
static char * AvailableExtensionVersion(void); static char * AvailableExtensionVersion(void);
static char * InstalledExtensionVersion(void); static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
@ -184,22 +185,59 @@ IsDistributedTable(Oid relationId)
{ {
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
cacheEntry = LookupDistTableCacheEntry(relationId);
/* /*
* Can't be a distributed relation if the extension hasn't been loaded * If extension hasn't been created, or has the wrong version and the
* yet. As we can't do lookups in nonexistent tables, directly return * table isn't a distributed one, LookupDistTableCacheEntry() will return NULL.
* false.
*/ */
if (!CitusHasBeenLoaded()) if (!cacheEntry)
{ {
return false; return false;
} }
cacheEntry = LookupDistTableCacheEntry(relationId);
return cacheEntry->isDistributedTable; return cacheEntry->isDistributedTable;
} }
/*
* IsDistributedTableViaCatalog returns whether the given relation is a
* distributed table or not.
*
* It does so by searching pg_dist_partition, explicitly bypassing caches,
* because this function is designed to be used in cases where accessing
* metadata tables is not safe.
*
* NB: Currently this still hardcodes pg_dist_partition logicalrelid column
* offset and the corresponding index. If we ever come close to changing
* that, we'll have to work a bit harder.
*/
static bool
IsDistributedTableViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = NULL;
SysScanDesc scanDescriptor = NULL;
const int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
partitionTuple = systable_getnext(scanDescriptor);
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, AccessShareLock);
return HeapTupleIsValid(partitionTuple);
}
/* /*
* DistributedTableList returns a list that includes all the valid distributed table * DistributedTableList returns a list that includes all the valid distributed table
* cache entries. * cache entries.
@ -211,6 +249,8 @@ DistributedTableList(void)
List *distributedTableList = NIL; List *distributedTableList = NIL;
ListCell *distTableOidCell = NULL; ListCell *distTableOidCell = NULL;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* first, we need to iterate over pg_dist_partition */ /* first, we need to iterate over pg_dist_partition */
distTableOidList = DistTableOidList(); distTableOidList = DistTableOidList();
@ -360,6 +400,8 @@ LookupShardCacheEntry(int64 shardId)
bool foundInCache = false; bool foundInCache = false;
bool recheck = false; bool recheck = false;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* probably not reachable */ /* probably not reachable */
if (DistShardCacheHash == NULL) if (DistShardCacheHash == NULL)
{ {
@ -435,27 +477,16 @@ DistributedTableCacheEntry(Oid distributedRelationId)
{ {
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
cacheEntry = LookupDistTableCacheEntry(distributedRelationId); cacheEntry = LookupDistTableCacheEntry(distributedRelationId);
if (cacheEntry->isDistributedTable) if (cacheEntry && cacheEntry->isDistributedTable)
{ {
return cacheEntry; return cacheEntry;
} }
else else
{ {
char *relationName = get_rel_name(distributedRelationId); char *relationName = get_rel_name(distributedRelationId);
ereport(ERROR, (errmsg("relation %s is not distributed", ereport(ERROR, (errmsg("relation %s is not distributed", relationName)));
relationName)));
} }
} }
@ -471,6 +502,43 @@ LookupDistTableCacheEntry(Oid relationId)
bool foundInCache = false; bool foundInCache = false;
void *hashKey = (void *) &relationId; void *hashKey = (void *) &relationId;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
/*
* If the version is not known to be compatible, perform thorough check,
* unless such checks are disabled.
*/
if (!citusVersionKnownCompatible && EnableVersionChecks)
{
bool isDistributed = IsDistributedTableViaCatalog(relationId);
int reportLevel = DEBUG1;
/*
* If there's a version-mismatch, and we're dealing with a distributed
* table, we have to error out as we can't return a valid entry. We
* want to check compatibility in the non-distributed case as well, so
* future lookups can use the cache if compatible.
*/
if (isDistributed)
{
reportLevel = ERROR;
}
if (!CheckCitusVersion(reportLevel))
{
/* incompatible, can't access cache, so return before doing so */
return NULL;
}
}
if (DistTableCacheHash == NULL) if (DistTableCacheHash == NULL)
{ {
InitializeDistTableCache(); InitializeDistTableCache();
@ -1066,86 +1134,114 @@ CitusHasBeenLoaded(void)
DistPartitionRelationId(); DistPartitionRelationId();
/* /*
* We also set installedExtensionVersion to NULL so that it will be re-read * We also reset citusVersionKnownCompatible, so it will be re-read in
* in case of extension update. * case of extension update.
*/ */
installedExtensionVersion = NULL; citusVersionKnownCompatible = false;
} }
} }
if (extensionLoaded)
{
ErrorIfAvailableVersionMismatch();
ErrorIfInstalledVersionMismatch();
}
return extensionLoaded; return extensionLoaded;
} }
/* /*
* ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and * CheckCitusVersion checks whether there is a version mismatch between the
* currently available version from citus.control file. If they are not same in * available version and the loaded version or between the installed version
* major or minor version numbers, this function errors out. It ignores the schema * and the loaded version. Returns true if compatible, false otherwise.
* version. *
* As a side effect, this function also sets citusVersionKnownCompatible global
* variable to true which reduces version check cost of next calls.
*/ */
void bool
ErrorIfAvailableVersionMismatch(void) CheckCitusVersion(int elevel)
{
if (citusVersionKnownCompatible ||
!CitusHasBeenLoaded() ||
!EnableVersionChecks)
{
return true;
}
if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel))
{
citusVersionKnownCompatible = true;
return true;
}
else
{
return false;
}
}
/*
* CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently
* available version from the citus.control file. If they are not compatible,
* this function logs an error with the specified elevel and returns false,
* otherwise it returns true.
*/
static bool
CheckAvailableVersion(int elevel)
{ {
char *availableVersion = NULL; char *availableVersion = NULL;
if (!EnableVersionChecks) Assert(CitusHasBeenLoaded());
{ Assert(EnableVersionChecks);
return;
}
availableVersion = AvailableExtensionVersion(); availableVersion = AvailableExtensionVersion();
if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION)) if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION))
{ {
ereport(ERROR, (errmsg("loaded Citus library version differs from latest " ereport(elevel, (errmsg("loaded Citus library version differs from latest "
"available extension version"), "available extension version"),
errdetail("Loaded library requires %s, but the latest control " errdetail("Loaded library requires %s, but the latest control "
"file specifies %s.", CITUS_MAJORVERSION, "file specifies %s.", CITUS_MAJORVERSION,
availableVersion), availableVersion),
errhint("Restart the database to load the latest Citus " errhint("Restart the database to load the latest Citus "
"library."))); "library.")));
return false;
} }
return true;
} }
/* /*
* ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently * CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the
* and catalog version from pg_extemsion catalog table. If they are not same in * extension's current version from the pg_extemsion catalog table. If they
* major or minor version numbers, this function errors out. It ignores the schema * are not compatible, this function logs an error with the specified elevel,
* version. * otherwise it returns true.
*/ */
static void static bool
ErrorIfInstalledVersionMismatch(void) CheckInstalledVersion(int elevel)
{ {
char *installedVersion = NULL; char *installedVersion = NULL;
if (!EnableVersionChecks) Assert(CitusHasBeenLoaded());
{ Assert(EnableVersionChecks);
return;
}
installedVersion = InstalledExtensionVersion(); installedVersion = InstalledExtensionVersion();
if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION)) if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION))
{ {
ereport(ERROR, (errmsg("loaded Citus library version differs from installed " ereport(elevel, (errmsg("loaded Citus library version differs from installed "
"extension version"), "extension version"),
errdetail("Loaded library requires %s, but the installed " errdetail("Loaded library requires %s, but the installed "
"extension version is %s.", CITUS_MAJORVERSION, "extension version is %s.", CITUS_MAJORVERSION,
installedVersion), installedVersion),
errhint("Run ALTER EXTENSION citus UPDATE and try again."))); errhint("Run ALTER EXTENSION citus UPDATE and try again.")));
return false;
} }
return true;
} }
/* /*
* MajorVersionsCompatible compares given two versions. If they are same in major * MajorVersionsCompatible checks whether both versions are compatible. They
* and minor version numbers, this function returns true. It ignores the schema * are if major and minor version numbers match, the schema version is
* version. * ignored. Returns true if compatible, false otherwise.
*/ */
bool bool
MajorVersionsCompatible(char *leftVersion, char *rightVersion) MajorVersionsCompatible(char *leftVersion, char *rightVersion)
@ -1203,12 +1299,7 @@ AvailableExtensionVersion(void)
bool hasTuple = false; bool hasTuple = false;
bool goForward = true; bool goForward = true;
bool doCopy = false; bool doCopy = false;
char *availableExtensionVersion;
/* if we cached the result before, return it*/
if (availableExtensionVersion != NULL)
{
return availableExtensionVersion;
}
estate = CreateExecutorState(); estate = CreateExecutorState();
extensionsResultSet = makeNode(ReturnSetInfo); extensionsResultSet = makeNode(ReturnSetInfo);
@ -1274,8 +1365,6 @@ AvailableExtensionVersion(void)
/* /*
* InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table. * InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table.
* It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion
* will not read the catalog tables again.
*/ */
static char * static char *
InstalledExtensionVersion(void) InstalledExtensionVersion(void)
@ -1284,12 +1373,7 @@ InstalledExtensionVersion(void)
SysScanDesc scandesc; SysScanDesc scandesc;
ScanKeyData entry[1]; ScanKeyData entry[1];
HeapTuple extensionTuple = NULL; HeapTuple extensionTuple = NULL;
char *installedExtensionVersion = NULL;
/* if we cached the result before, return it*/
if (installedExtensionVersion != NULL)
{
return installedExtensionVersion;
}
relation = heap_open(ExtensionRelationId, AccessShareLock); relation = heap_open(ExtensionRelationId, AccessShareLock);

View File

@ -18,7 +18,7 @@
#include "utils/hsearch.h" #include "utils/hsearch.h"
extern bool EnableVersionChecks; extern bool EnableVersionChecks;
extern char *availableExtensionVersion; extern bool citusVersionKnownCompatible;
/* /*
* Representation of a table's metadata that is frequently used for * Representation of a table's metadata that is frequently used for
@ -79,7 +79,7 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern bool CitusHasBeenLoaded(void); extern bool CitusHasBeenLoaded(void);
void ErrorIfAvailableVersionMismatch(void); extern bool CheckCitusVersion(int elevel);
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
/* access WorkerNodeHash */ /* access WorkerNodeHash */