Merge branch 'master' into better_comment_for_tests

pull/1426/head
Önder Kalacı 2017-05-22 10:58:21 +03:00 committed by GitHub
commit 757f5be858
32 changed files with 467 additions and 175 deletions

View File

@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
bool requireEmpty = true;
EnsureCoordinator();
CheckCitusVersion(ERROR);
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
{
@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
char *colocateWithTableName = NULL;
EnsureCoordinator();
CheckCitusVersion(ERROR);
/* guard against a binary update without a function update */
if (PG_NARGS() >= 4)
@ -233,13 +235,17 @@ static void
CreateReferenceTable(Oid relationId)
{
uint32 colocationId = INVALID_COLOCATION_ID;
List *workerNodeList = ActiveWorkerNodeList();
int replicationFactor = list_length(workerNodeList);
List *workerNodeList = NIL;
int replicationFactor = 0;
char *distributionColumnName = NULL;
bool requireEmpty = true;
char relationKind = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
workerNodeList = ActiveWorkerNodeList();
replicationFactor = list_length(workerNodeList);
/* if there are no workers, error out */
if (replicationFactor == 0)

View File

@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS)
char *tableName = text_to_cstring(tableNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName);

View File

@ -99,7 +99,6 @@ struct DropRelationCallbackState
/* Local functions forward declarations for deciding when to perform processing/checks */
static bool SkipCitusProcessingForUtility(Node *parsetree);
static bool IsCitusExtensionStmt(Node *parsetree);
/* Local functions forward declarations for Transmit statement */
@ -185,22 +184,28 @@ multi_ProcessUtility(Node *parsetree,
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
List *ddlJobs = NIL;
bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree);
bool checkExtensionVersion = false;
if (skipCitusProcessing)
if (IsA(parsetree, TransactionStmt))
{
bool checkExtensionVersion = IsCitusExtensionStmt(parsetree);
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
return;
}
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
if (EnableVersionChecks && checkExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
return;
}
if (!CitusHasBeenLoaded())
{
@ -447,63 +452,6 @@ multi_ProcessUtility(Node *parsetree,
}
/*
* SkipCitusProcessingForUtility simply returns whether a given utility should
* bypass Citus processing and checks and be handled exclusively by standard
* PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION,
* ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus.
*/
static bool
SkipCitusProcessingForUtility(Node *parsetree)
{
switch (parsetree->type)
{
/*
* In the CitusHasBeenLoaded check, we compare versions of loaded code,
* the installed extension, and available extension. If they differ, we
* force user to execute ALTER EXTENSION citus UPDATE. To allow this,
* CREATE/DROP/ALTER extension must be omitted from Citus processing.
*/
case T_DropStmt:
{
DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType != OBJECT_EXTENSION)
{
return false;
}
}
/* no break, fall through */
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
case T_TransactionStmt:
/*
* Skip processing of variable set statements, to allow changing the
* enable_version_checks GUC during testing.
*/
case T_VariableSetStmt:
{
return true;
}
default:
{
return false;
}
}
}
/*
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
* EXTENSION statement which references the citus extension. This function
@ -1474,8 +1422,8 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
* from the citus.control file. In case a new default is available, we
* will force a compatibility check of the latest available version.
*/
availableExtensionVersion = NULL;
ErrorIfAvailableVersionMismatch();
citusVersionKnownCompatible = false;
CheckCitusVersion(ERROR);
}
}

View File

@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS)
int commandIndex = 0;
int commandCount = 0;
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
{

View File

@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
Oid distributedTableId = ResolveRelationId(tableNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);

View File

@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
bool failOK = false;
EnsureCoordinator();
CheckCitusVersion(ERROR);
queryTreeNode = ParseTreeNode(queryString);
if (!IsA(queryTreeNode, DeleteStmt))
@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName);
@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS)
StringInfo dropSeqCommand = makeStringInfo();
bool coordinator = IsCoordinator();
CheckCitusVersion(ERROR);
/* do nothing if DDL propagation is switched off or this is not the coordinator */
if (!EnableDDLPropagation || !coordinator)
{

View File

@ -46,14 +46,21 @@ Datum
master_expire_table_cache(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *workerNodeList = ActiveWorkerNodeList();
DistTableCacheEntry *cacheEntry = NULL;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
int shardCount = cacheEntry->shardIntervalArrayLength;
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
int shardCount = 0;
ShardInterval **shardIntervalArray = NULL;
List **placementListArray = NULL;
int shardIndex = 0;
CheckCitusVersion(ERROR);
cacheEntry = DistributedTableCacheEntry(relationId);
workerNodeList = ActiveWorkerNodeList();
shardCount = cacheEntry->shardIntervalArrayLength;
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
if (shardCount == 0)
{
ereport(WARNING, (errmsg("Table has no shards, no action is taken")));

View File

@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 totalRelationSize = 0;
CheckCitusVersion(ERROR);
totalRelationSize = DistributedTableSize(relationId,
PG_TOTAL_RELATION_SIZE_FUNCTION);
@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 tableSize = 0;
CheckCitusVersion(ERROR);
tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION);
PG_RETURN_INT64(tableSize);
@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 relationSize = 0;
CheckCitusVersion(ERROR);
relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION);
PG_RETURN_INT64(relationSize);

View File

@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
int32 affectedTupleCount = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
queryTreeNode = ParseTreeNode(queryString);
if (IsA(queryTreeNode, DeleteStmt))

View File

@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
Datum values[TABLE_METADATA_FIELDS];
bool isNulls[TABLE_METADATA_FIELDS];
CheckCitusVersion(ERROR);
/* find partition tuple for partitioned relation */
partitionEntry = DistributedTableCacheEntry(relationId);
@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
FuncCallContext *functionContext = NULL;
ListCell *tableDDLEventCell = NULL;
CheckCitusVersion(ERROR);
/*
* On the very first call to this function, we first use the given relation
* name to get to the relation. We then recreate the list of DDL statements
@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
Datum shardIdDatum = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
shardId = GetNextShardId();
shardIdDatum = Int64GetDatum(shardId);
@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
Datum placementIdDatum = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
placementId = GetNextPlacementId();
placementIdDatum = Int64GetDatum(placementId);
@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
uint32 workerNodeIndex = 0;
uint32 workerNodeCount = 0;
CheckCitusVersion(ERROR);
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldContext = NULL;

View File

@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
/* RepairShardPlacement function repairs only given shard */
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,

View File

@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS)
FmgrInfo *hashFunction = NULL;
Oid valueDataType = InvalidOid;
CheckCitusVersion(ERROR);
/* figure out hash function from the data type */
valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0);
typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO);

View File

@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
{
text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = NIL;
uint64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0;
@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char replicationModel = REPLICATION_MODEL_INVALID;
bool includeSequenceDefaults = false;
CheckCitusVersion(ERROR);
workerNodeList = ActiveWorkerNodeList();
EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId);
@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
float4 shardFillLevel = 0.0;
char partitionMethod = 0;
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
bool cstoreTable = CStoreTable(relationId);
ShardInterval *shardInterval = NULL;
Oid relationId = InvalidOid;
bool cstoreTable = false;
char storageType = shardInterval->storageType;
char storageType = 0;
CheckCitusVersion(ERROR);
shardInterval = LoadShardInterval(shardId);
relationId = shardInterval->relationId;
cstoreTable = CStoreTable(relationId);
storageType = shardInterval->storageType;
EnsureTablePermissions(relationId, ACL_INSERT);
@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS)
int64 shardId = PG_GETARG_INT64(0);
uint64 shardSize = 0;
CheckCitusVersion(ERROR);
shardSize = UpdateShardStatistics(shardId);
PG_RETURN_INT64(shardSize);

View File

@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureCoordinator();
EnsureSuperUser();
CheckCitusVersion(ERROR);
PreventTransactionChain(true, "start_metadata_sync_to_node");
@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureCoordinator();
EnsureSuperUser();
CheckCitusVersion(ERROR);
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)

View File

@ -28,6 +28,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "distributed/metadata_cache.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "mb/pg_wchar.h"
@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS)
errmsg("shard_id cannot be null")));
}
relationId = PG_GETARG_OID(0);
shardId = PG_GETARG_INT64(1);
CheckCitusVersion(ERROR);
if (shardId <= 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

View File

@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
{
int recoveredTransactionCount = 0;
CheckCitusVersion(ERROR);
recoveredTransactionCount = RecoverPreparedTransactions();
PG_RETURN_INT32(recoveredTransactionCount);

View File

@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);

View File

@ -17,6 +17,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "distributed/distribution_column.h"
#include "distributed/metadata_cache.h"
#include "nodes/makefuncs.h"
#include "nodes/nodes.h"
#include "nodes/primnodes.h"
@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS)
char *columnNodeString = NULL;
text *columnNodeText = NULL;
CheckCitusVersion(ERROR);
relation = relation_open(relationId, AccessShareLock);
column = BuildDistributionKeyFromColumnName(relation, columnName);
@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS)
char *columnName = NULL;
text *columnText = NULL;
CheckCitusVersion(ERROR);
columnName = ColumnNameToColumn(relationId, columnNodeString);
columnText = cstring_to_text(columnName);

View File

@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid;
/* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */
char *availableExtensionVersion = NULL;
static char *installedExtensionVersion = NULL;
bool citusVersionKnownCompatible = false;
/* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL;
@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1];
/* local function forward declarations */
static bool IsDistributedTableViaCatalog(Oid relationId);
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
@ -142,7 +142,8 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static void ErrorIfInstalledVersionMismatch(void);
static bool CheckInstalledVersion(int elevel);
static bool CheckAvailableVersion(int elevel);
static char * AvailableExtensionVersion(void);
static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
@ -184,22 +185,59 @@ IsDistributedTable(Oid relationId)
{
DistTableCacheEntry *cacheEntry = NULL;
cacheEntry = LookupDistTableCacheEntry(relationId);
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return
* false.
* If extension hasn't been created, or has the wrong version and the
* table isn't a distributed one, LookupDistTableCacheEntry() will return NULL.
*/
if (!CitusHasBeenLoaded())
if (!cacheEntry)
{
return false;
}
cacheEntry = LookupDistTableCacheEntry(relationId);
return cacheEntry->isDistributedTable;
}
/*
* IsDistributedTableViaCatalog returns whether the given relation is a
* distributed table or not.
*
* It does so by searching pg_dist_partition, explicitly bypassing caches,
* because this function is designed to be used in cases where accessing
* metadata tables is not safe.
*
* NB: Currently this still hardcodes pg_dist_partition logicalrelid column
* offset and the corresponding index. If we ever come close to changing
* that, we'll have to work a bit harder.
*/
static bool
IsDistributedTableViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = NULL;
SysScanDesc scanDescriptor = NULL;
const int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
partitionTuple = systable_getnext(scanDescriptor);
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, AccessShareLock);
return HeapTupleIsValid(partitionTuple);
}
/*
* DistributedTableList returns a list that includes all the valid distributed table
* cache entries.
@ -211,6 +249,8 @@ DistributedTableList(void)
List *distributedTableList = NIL;
ListCell *distTableOidCell = NULL;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* first, we need to iterate over pg_dist_partition */
distTableOidList = DistTableOidList();
@ -360,6 +400,8 @@ LookupShardCacheEntry(int64 shardId)
bool foundInCache = false;
bool recheck = false;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* probably not reachable */
if (DistShardCacheHash == NULL)
{
@ -435,27 +477,16 @@ DistributedTableCacheEntry(Oid distributedRelationId)
{
DistTableCacheEntry *cacheEntry = NULL;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
cacheEntry = LookupDistTableCacheEntry(distributedRelationId);
if (cacheEntry->isDistributedTable)
if (cacheEntry && cacheEntry->isDistributedTable)
{
return cacheEntry;
}
else
{
char *relationName = get_rel_name(distributedRelationId);
ereport(ERROR, (errmsg("relation %s is not distributed",
relationName)));
ereport(ERROR, (errmsg("relation %s is not distributed", relationName)));
}
}
@ -471,6 +502,43 @@ LookupDistTableCacheEntry(Oid relationId)
bool foundInCache = false;
void *hashKey = (void *) &relationId;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
/*
* If the version is not known to be compatible, perform thorough check,
* unless such checks are disabled.
*/
if (!citusVersionKnownCompatible && EnableVersionChecks)
{
bool isDistributed = IsDistributedTableViaCatalog(relationId);
int reportLevel = DEBUG1;
/*
* If there's a version-mismatch, and we're dealing with a distributed
* table, we have to error out as we can't return a valid entry. We
* want to check compatibility in the non-distributed case as well, so
* future lookups can use the cache if compatible.
*/
if (isDistributed)
{
reportLevel = ERROR;
}
if (!CheckCitusVersion(reportLevel))
{
/* incompatible, can't access cache, so return before doing so */
return NULL;
}
}
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
@ -1066,86 +1134,114 @@ CitusHasBeenLoaded(void)
DistPartitionRelationId();
/*
* We also set installedExtensionVersion to NULL so that it will be re-read
* in case of extension update.
* We also reset citusVersionKnownCompatible, so it will be re-read in
* case of extension update.
*/
installedExtensionVersion = NULL;
citusVersionKnownCompatible = false;
}
}
if (extensionLoaded)
{
ErrorIfAvailableVersionMismatch();
ErrorIfInstalledVersionMismatch();
}
return extensionLoaded;
}
/*
* ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and
* currently available version from citus.control file. If they are not same in
* major or minor version numbers, this function errors out. It ignores the schema
* version.
* CheckCitusVersion checks whether there is a version mismatch between the
* available version and the loaded version or between the installed version
* and the loaded version. Returns true if compatible, false otherwise.
*
* As a side effect, this function also sets citusVersionKnownCompatible global
* variable to true which reduces version check cost of next calls.
*/
void
ErrorIfAvailableVersionMismatch(void)
bool
CheckCitusVersion(int elevel)
{
if (citusVersionKnownCompatible ||
!CitusHasBeenLoaded() ||
!EnableVersionChecks)
{
return true;
}
if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel))
{
citusVersionKnownCompatible = true;
return true;
}
else
{
return false;
}
}
/*
* CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently
* available version from the citus.control file. If they are not compatible,
* this function logs an error with the specified elevel and returns false,
* otherwise it returns true.
*/
static bool
CheckAvailableVersion(int elevel)
{
char *availableVersion = NULL;
if (!EnableVersionChecks)
{
return;
}
Assert(CitusHasBeenLoaded());
Assert(EnableVersionChecks);
availableVersion = AvailableExtensionVersion();
if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION))
{
ereport(ERROR, (errmsg("loaded Citus library version differs from latest "
ereport(elevel, (errmsg("loaded Citus library version differs from latest "
"available extension version"),
errdetail("Loaded library requires %s, but the latest control "
"file specifies %s.", CITUS_MAJORVERSION,
availableVersion),
errhint("Restart the database to load the latest Citus "
"library.")));
return false;
}
return true;
}
/*
* ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently
* and catalog version from pg_extemsion catalog table. If they are not same in
* major or minor version numbers, this function errors out. It ignores the schema
* version.
* CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the
* extension's current version from the pg_extemsion catalog table. If they
* are not compatible, this function logs an error with the specified elevel,
* otherwise it returns true.
*/
static void
ErrorIfInstalledVersionMismatch(void)
static bool
CheckInstalledVersion(int elevel)
{
char *installedVersion = NULL;
if (!EnableVersionChecks)
{
return;
}
Assert(CitusHasBeenLoaded());
Assert(EnableVersionChecks);
installedVersion = InstalledExtensionVersion();
if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION))
{
ereport(ERROR, (errmsg("loaded Citus library version differs from installed "
ereport(elevel, (errmsg("loaded Citus library version differs from installed "
"extension version"),
errdetail("Loaded library requires %s, but the installed "
"extension version is %s.", CITUS_MAJORVERSION,
installedVersion),
errhint("Run ALTER EXTENSION citus UPDATE and try again.")));
return false;
}
return true;
}
/*
* MajorVersionsCompatible compares given two versions. If they are same in major
* and minor version numbers, this function returns true. It ignores the schema
* version.
* MajorVersionsCompatible checks whether both versions are compatible. They
* are if major and minor version numbers match, the schema version is
* ignored. Returns true if compatible, false otherwise.
*/
bool
MajorVersionsCompatible(char *leftVersion, char *rightVersion)
@ -1203,12 +1299,7 @@ AvailableExtensionVersion(void)
bool hasTuple = false;
bool goForward = true;
bool doCopy = false;
/* if we cached the result before, return it*/
if (availableExtensionVersion != NULL)
{
return availableExtensionVersion;
}
char *availableExtensionVersion;
estate = CreateExecutorState();
extensionsResultSet = makeNode(ReturnSetInfo);
@ -1274,8 +1365,6 @@ AvailableExtensionVersion(void)
/*
* InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table.
* It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion
* will not read the catalog tables again.
*/
static char *
InstalledExtensionVersion(void)
@ -1284,12 +1373,7 @@ InstalledExtensionVersion(void)
SysScanDesc scandesc;
ScanKeyData entry[1];
HeapTuple extensionTuple = NULL;
/* if we cached the result before, return it*/
if (installedExtensionVersion != NULL)
{
return installedExtensionVersion;
}
char *installedExtensionVersion = NULL;
relation = heap_open(ExtensionRelationId, AccessShareLock);
@ -1670,6 +1754,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1731,6 +1817,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1792,6 +1880,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1848,6 +1938,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
@ -1871,6 +1963,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -95,8 +95,11 @@ master_add_node(PG_FUNCTION_ARGS)
bool hasMetadata = false;
bool isActive = false;
bool nodeAlreadyExists = false;
Datum nodeRecord;
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
CheckCitusVersion(ERROR);
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
/*
@ -129,8 +132,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
bool hasMetadata = false;
bool isActive = false;
bool nodeAlreadyExists = false;
Datum nodeRecord;
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
CheckCitusVersion(ERROR);
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
PG_RETURN_CSTRING(nodeRecord);
@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS)
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
CheckCitusVersion(ERROR);
RemoveNodeFromCluster(nodeNameString, nodePort);
PG_RETURN_VOID();
@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS)
bool hasShardPlacements = false;
bool isActive = false;
CheckCitusVersion(ERROR);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS)
char *nodeNameString = text_to_cstring(nodeName);
Datum nodeRecord = 0;
CheckCitusVersion(ERROR);
nodeRecord = ActivateNode(nodeNameString, nodePort);
PG_RETURN_CSTRING(nodeRecord);
@ -263,9 +275,12 @@ Datum
master_initialize_node_metadata(PG_FUNCTION_ARGS)
{
ListCell *workerNodeCell = NULL;
List *workerNodes = ParseWorkerNodeFileAndRename();
List *workerNodes = NULL;
bool nodeAlreadyExists = false;
CheckCitusVersion(ERROR);
workerNodes = ParseWorkerNodeFileAndRename();
foreach(workerNodeCell, workerNodes)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
@ -273,8 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack, false, workerNode->isActive,
&nodeAlreadyExists);
ActivateNode(workerNode->workerName, workerNode->workerPort);
}
PG_RETURN_BOOL(true);
@ -293,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
char distributionMethod = 0;
Oid relationId = InvalidOid;
CheckCitusVersion(ERROR);
/*
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
* we need to check all parameters for NULL values.
@ -1045,7 +1060,7 @@ ParseWorkerNodeFileAndRename()
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNode->isActive = false;
workerNode->isActive = true;
workerNodeList = lappend(workerNodeList, workerNode);
}

View File

@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
DistTableCacheEntry *tableEntry = NULL;
EnsureCoordinator();
CheckCitusVersion(ERROR);
if (!IsDistributedTable(relationId))
{

View File

@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS)
int shardIdCount = 0;
int shardIdIndex = 0;
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS)
int shardIdCount = 0;
int shardIdIndex = 0;
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));

View File

@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
char *taskCallString = text_to_cstring(taskCallStringText);
uint32 taskCallStringLength = strlen(taskCallString);
bool taskTrackerRunning = false;
CheckCitusVersion(ERROR);
/* check that we have a running task tracker on this host */
bool taskTrackerRunning = TaskTrackerRunning();
taskTrackerRunning = TaskTrackerRunning();
if (!taskTrackerRunning)
{
ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW),
@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
WorkerTask *workerTask = NULL;
uint32 taskStatus = 0;
bool taskTrackerRunning = TaskTrackerRunning();
bool taskTrackerRunning = false;
CheckCitusVersion(ERROR);
taskTrackerRunning = TaskTrackerRunning();
if (taskTrackerRunning)
{
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
StringInfo jobDirectoryName = NULL;
StringInfo jobSchemaName = NULL;
CheckCitusVersion(ERROR);
/*
* We first clean up any open connections, and remove tasks belonging to
* this job from the shared hash.

View File

@ -114,6 +114,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
* task directory does not exist. We then lock and create the directory.
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
@ -155,6 +158,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
* task directory does not exist. We then lock and create the directory.
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
@ -415,6 +421,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
@ -443,6 +451,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId,
@ -472,6 +482,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
Oid sequenceRelationId = InvalidOid;
NodeTag nodeType = nodeTag(commandNode);
CheckCitusVersion(ERROR);
if (nodeType != T_CreateSeqStmt)
{
ereport(ERROR,
@ -513,6 +526,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
CheckCitusVersion(ERROR);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
@ -537,6 +552,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
CheckCitusVersion(ERROR);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
@ -1211,6 +1228,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
bool received = false;
StringInfo queryString = NULL;
CheckCitusVersion(ERROR);
/* We extract schema names and table names from qualified names */
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);

View File

@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
Relation distributedRelation = NULL;
List *shardList = LoadShardList(relationId);
List *shardList = NULL;
ListCell *shardCell = NULL;
char relationKind = '\0';
CheckCitusVersion(ERROR);
EnsureSuperUser();
shardList = LoadShardList(relationId);
/* first check the relation type */
distributedRelation = relation_open(relationId, AccessShareLock);
relationKind = distributedRelation->rd_rel->relkind;

View File

@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS)
ForeignTable *foreignTable = GetForeignTable(relationId);
ListCell *optionCell = NULL;
CheckCitusVersion(ERROR);
foreach(optionCell, foreignTable->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS)
(void) blockId;
(void) dataDirectoryObject;
CheckCitusVersion(ERROR);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("called function is currently unsupported")));

View File

@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
/* we should have the same number of column names and types */
int32 columnNameCount = ArrayObjectCount(columnNameObject);
int32 columnTypeCount = ArrayObjectCount(columnTypeObject);
CheckCitusVersion(ERROR);
if (columnNameCount != columnTypeCount)
{
ereport(ERROR, (errmsg("column name array size: %d and type array size: %d"
@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
int createIntermediateTableResult = 0;
int finished = 0;
CheckCitusVersion(ERROR);
/*
* If the schema for the job isn't already created by the task tracker
* protocol, we fall to using the default 'public' schema.
@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS)
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
CheckCitusVersion(ERROR);
pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock);
scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey);

View File

@ -112,6 +112,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
/* first check that array element's and partition column's types match */
Oid splitPointType = ARR_ELEMTYPE(splitPointObject);
CheckCitusVersion(ERROR);
if (splitPointType != partitionColumnType)
{
ereport(ERROR, (errmsg("partition column type %u and split point type %u "
@ -184,6 +187,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
FileOutputStream *partitionFileArray = NULL;
uint32 fileCount = partitionCount;
CheckCitusVersion(ERROR);
/* use column's type information to get the hashing function */
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC);

View File

@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
EnsureSuperUser();
CheckCitusVersion(ERROR);
/* Create the truncate trigger */
CreateTruncateTrigger(relationId);

View File

@ -18,7 +18,7 @@
#include "utils/hsearch.h"
extern bool EnableVersionChecks;
extern char *availableExtensionVersion;
extern bool citusVersionKnownCompatible;
/*
* Representation of a table's metadata that is frequently used for
@ -79,7 +79,7 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern bool CitusHasBeenLoaded(void);
void ErrorIfAvailableVersionMismatch(void);
extern bool CheckCitusVersion(int elevel);
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
/* access WorkerNodeHash */

View File

@ -109,6 +109,67 @@ CREATE EXTENSION citus VERSION '5.0';
ERROR: specified version incompatible with loaded Citus library
DETAIL: Loaded library requires 7.0, but 5.0 was specified.
HINT: If a newer library is present, restart the database and try the command again.
-- Test non-distributed queries work even in version mismatch
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '6.2-1';
SET citus.enable_version_checks TO 'true';
-- Test CREATE TABLE
CREATE TABLE version_mismatch_table(column1 int);
-- Test COPY
\copy version_mismatch_table FROM STDIN;
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
column1
---------
0
1
2
3
4
5
(6 rows)
-- Test SELECT from pg_catalog
SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
FROM pg_catalog.pg_database d
ORDER BY 1;
Name | Owner | Access privileges
------------+----------+-----------------------
postgres | postgres |
regression | postgres |
template0 | postgres | =c/postgres +
| | postgres=CTc/postgres
template1 | postgres | =c/postgres +
| | postgres=CTc/postgres
(4 rows)
-- We should not distribute table in version mistmatch
SELECT create_distributed_table('version_mismatch_table', 'column1');
ERROR: loaded Citus library version differs from installed extension version
DETAIL: Loaded library requires 7.0, but the installed extension version is 6.2-1.
HINT: Run ALTER EXTENSION citus UPDATE and try again.
-- This function will cause fail in next ALTER EXTENSION
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
RETURNS bigint LANGUAGE plpgsql
AS $function$
BEGIN
END;
$function$;
SET citus.enable_version_checks TO 'false';
-- This will fail because of previous function declaration
ALTER EXTENSION citus UPDATE TO '6.2-2';
ERROR: function "citus_table_size" already exists with same argument types
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
SET citus.enable_version_checks TO 'true';
DROP FUNCTION citus_table_size(regclass);
SET citus.enable_version_checks TO 'false';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- re-create in newest version
DROP EXTENSION citus;
\c
CREATE EXTENSION citus;

View File

@ -101,6 +101,59 @@ RESET citus.enable_version_checks;
DROP EXTENSION citus;
CREATE EXTENSION citus VERSION '5.0';
-- Test non-distributed queries work even in version mismatch
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '6.2-1';
SET citus.enable_version_checks TO 'true';
-- Test CREATE TABLE
CREATE TABLE version_mismatch_table(column1 int);
-- Test COPY
\copy version_mismatch_table FROM STDIN;
0
1
2
3
4
\.
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
-- Test SELECT from pg_catalog
SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
FROM pg_catalog.pg_database d
ORDER BY 1;
-- We should not distribute table in version mistmatch
SELECT create_distributed_table('version_mismatch_table', 'column1');
-- This function will cause fail in next ALTER EXTENSION
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
RETURNS bigint LANGUAGE plpgsql
AS $function$
BEGIN
END;
$function$;
SET citus.enable_version_checks TO 'false';
-- This will fail because of previous function declaration
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
SET citus.enable_version_checks TO 'true';
DROP FUNCTION citus_table_size(regclass);
SET citus.enable_version_checks TO 'false';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- re-create in newest version
DROP EXTENSION citus;
\c
CREATE EXTENSION citus;