diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 825bf84d4..103fedc4f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) bool requireEmpty = true; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { @@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) @@ -233,13 +235,17 @@ static void CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = ActiveWorkerNodeList(); - int replicationFactor = list_length(workerNodeList); + List *workerNodeList = NIL; + int replicationFactor = 0; char *distributionColumnName = NULL; bool requireEmpty = true; char relationKind = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + replicationFactor = list_length(workerNodeList); /* if there are no workers, error out */ if (replicationFactor == 0) diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 76b29f1c8..734d478d3 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) char *tableName = text_to_cstring(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index d322324f8..2142b90db 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -99,7 +99,6 @@ struct DropRelationCallbackState /* Local functions forward declarations for deciding when to perform processing/checks */ -static bool SkipCitusProcessingForUtility(Node *parsetree); static bool IsCitusExtensionStmt(Node *parsetree); /* Local functions forward declarations for Transmit statement */ @@ -185,23 +184,29 @@ multi_ProcessUtility(Node *parsetree, Oid savedUserId = InvalidOid; int savedSecurityContext = 0; List *ddlJobs = NIL; - bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree); + bool checkExtensionVersion = false; - if (skipCitusProcessing) + if (IsA(parsetree, TransactionStmt)) { - bool checkExtensionVersion = IsCitusExtensionStmt(parsetree); - + /* + * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted + * transactions in which case a lot of checks cannot be done safely in + * that state. Since we never need to intercept transaction statements, + * skip our checks and immediately fall into standard_ProcessUtility. + */ standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); - if (EnableVersionChecks && checkExtensionVersion) - { - ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); - } - return; } + checkExtensionVersion = IsCitusExtensionStmt(parsetree); + if (EnableVersionChecks && checkExtensionVersion) + { + ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); + } + + if (!CitusHasBeenLoaded()) { /* @@ -447,63 +452,6 @@ multi_ProcessUtility(Node *parsetree, } -/* - * SkipCitusProcessingForUtility simply returns whether a given utility should - * bypass Citus processing and checks and be handled exclusively by standard - * PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION, - * ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus. - */ -static bool -SkipCitusProcessingForUtility(Node *parsetree) -{ - switch (parsetree->type) - { - /* - * In the CitusHasBeenLoaded check, we compare versions of loaded code, - * the installed extension, and available extension. If they differ, we - * force user to execute ALTER EXTENSION citus UPDATE. To allow this, - * CREATE/DROP/ALTER extension must be omitted from Citus processing. - */ - case T_DropStmt: - { - DropStmt *dropStatement = (DropStmt *) parsetree; - - if (dropStatement->removeType != OBJECT_EXTENSION) - { - return false; - } - } - - /* no break, fall through */ - - case T_CreateExtensionStmt: - case T_AlterExtensionStmt: - - /* - * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted - * transactions in which case a lot of checks cannot be done safely in - * that state. Since we never need to intercept transaction statements, - * skip our checks and immediately fall into standard_ProcessUtility. - */ - case T_TransactionStmt: - - /* - * Skip processing of variable set statements, to allow changing the - * enable_version_checks GUC during testing. - */ - case T_VariableSetStmt: - { - return true; - } - - default: - { - return false; - } - } -} - - /* * IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER * EXTENSION statement which references the citus extension. This function @@ -1474,8 +1422,8 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree) * from the citus.control file. In case a new default is available, we * will force a compatibility check of the latest available version. */ - availableExtensionVersion = NULL; - ErrorIfAvailableVersionMismatch(); + citusVersionKnownCompatible = false; + CheckCitusVersion(ERROR); } } diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index b46b60739..946ec16cf 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS) int commandIndex = 0; int commandCount = 0; + CheckCitusVersion(ERROR); + /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) { diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 56ec7e788..a56f283bf 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) Oid distributedTableId = ResolveRelationId(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index ed8418aa4..8f5943696 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) bool failOK = false; EnsureCoordinator(); + CheckCitusVersion(ERROR); queryTreeNode = ParseTreeNode(queryString); if (!IsA(queryTreeNode, DeleteStmt)) @@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); @@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS) StringInfo dropSeqCommand = makeStringInfo(); bool coordinator = IsCoordinator(); + CheckCitusVersion(ERROR); + /* do nothing if DDL propagation is switched off or this is not the coordinator */ if (!EnableDDLPropagation || !coordinator) { diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 70eef82aa..8d4f38b03 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -46,14 +46,21 @@ Datum master_expire_table_cache(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); - List *workerNodeList = ActiveWorkerNodeList(); + DistTableCacheEntry *cacheEntry = NULL; + List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; - int shardCount = cacheEntry->shardIntervalArrayLength; - ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + int shardCount = 0; + ShardInterval **shardIntervalArray = NULL; List **placementListArray = NULL; int shardIndex = 0; + CheckCitusVersion(ERROR); + + cacheEntry = DistributedTableCacheEntry(relationId); + workerNodeList = ActiveWorkerNodeList(); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalArray = cacheEntry->sortedShardIntervalArray; + if (shardCount == 0) { ereport(WARNING, (errmsg("Table has no shards, no action is taken"))); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index e6bfa574b..4081ad35b 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 totalRelationSize = 0; + CheckCitusVersion(ERROR); + totalRelationSize = DistributedTableSize(relationId, PG_TOTAL_RELATION_SIZE_FUNCTION); @@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 tableSize = 0; + CheckCitusVersion(ERROR); + tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION); PG_RETURN_INT64(tableSize); @@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 relationSize = 0; + CheckCitusVersion(ERROR); + relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION); PG_RETURN_INT64(relationSize); diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 086846fde..f7838b08c 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) int32 affectedTupleCount = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index db83b2665..d7214cf04 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS) Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; + CheckCitusVersion(ERROR); + /* find partition tuple for partitioned relation */ partitionEntry = DistributedTableCacheEntry(relationId); @@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) FuncCallContext *functionContext = NULL; ListCell *tableDDLEventCell = NULL; + CheckCitusVersion(ERROR); + /* * On the very first call to this function, we first use the given relation * name to get to the relation. We then recreate the list of DDL statements @@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS) Datum shardIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); shardId = GetNextShardId(); shardIdDatum = Int64GetDatum(shardId); @@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS) Datum placementIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); placementId = GetNextPlacementId(); placementIdDatum = Int64GetDatum(placementId); @@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) uint32 workerNodeIndex = 0; uint32 workerNodeCount = 0; + CheckCitusVersion(ERROR); + if (SRF_IS_FIRSTCALL()) { MemoryContext oldContext = NULL; diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 2471da365..1774895f8 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); /* RepairShardPlacement function repairs only given shard */ RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, diff --git a/src/backend/distributed/master/master_split_shards.c b/src/backend/distributed/master/master_split_shards.c index cc18a3c3e..a1e928e5d 100644 --- a/src/backend/distributed/master/master_split_shards.c +++ b/src/backend/distributed/master/master_split_shards.c @@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS) FmgrInfo *hashFunction = NULL; Oid valueDataType = InvalidOid; + CheckCitusVersion(ERROR); + /* figure out hash function from the data type */ valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f24bfc07c..cf492bc02 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = NIL; uint64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; @@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char replicationModel = REPLICATION_MODEL_INVALID; bool includeSequenceDefaults = false; + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) float4 shardFillLevel = 0.0; char partitionMethod = 0; - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid relationId = shardInterval->relationId; - bool cstoreTable = CStoreTable(relationId); + ShardInterval *shardInterval = NULL; + Oid relationId = InvalidOid; + bool cstoreTable = false; - char storageType = shardInterval->storageType; + char storageType = 0; + + CheckCitusVersion(ERROR); + + shardInterval = LoadShardInterval(shardId); + relationId = shardInterval->relationId; + cstoreTable = CStoreTable(relationId); + storageType = shardInterval->storageType; EnsureTablePermissions(relationId, ACL_INSERT); @@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS) int64 shardId = PG_GETARG_INT64(0); uint64 shardSize = 0; + CheckCitusVersion(ERROR); + shardSize = UpdateShardStatistics(shardId); PG_RETURN_INT64(shardSize); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index af1aa5257..e7a659c2b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); PreventTransactionChain(true, "start_metadata_sync_to_node"); @@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); workerNode = FindWorkerNode(nodeNameString, nodePort); if (workerNode == NULL) diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index eed0e881f..bc61da45e 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -28,6 +28,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "distributed/metadata_cache.h" #include "distributed/relay_utility.h" #include "lib/stringinfo.h" #include "mb/pg_wchar.h" @@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS) errmsg("shard_id cannot be null"))); } - relationId = PG_GETARG_OID(0); shardId = PG_GETARG_INT64(1); + CheckCitusVersion(ERROR); + if (shardId <= 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 70e16bd1d..edff64850 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) { int recoveredTransactionCount = 0; + CheckCitusVersion(ERROR); + recoveredTransactionCount = RecoverPreparedTransactions(); PG_RETURN_INT32(recoveredTransactionCount); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7dcade9cd..a7357f20d 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 0238b6fdd..e2a925a04 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -17,6 +17,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "distributed/distribution_column.h" +#include "distributed/metadata_cache.h" #include "nodes/makefuncs.h" #include "nodes/nodes.h" #include "nodes/primnodes.h" @@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS) char *columnNodeString = NULL; text *columnNodeText = NULL; + CheckCitusVersion(ERROR); + relation = relation_open(relationId, AccessShareLock); column = BuildDistributionKeyFromColumnName(relation, columnName); @@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS) char *columnName = NULL; text *columnText = NULL; + CheckCitusVersion(ERROR); + columnName = ColumnNameToColumn(relationId, columnNodeString); columnText = cstring_to_text(columnName); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 66a2af3da..128461a6a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid; /* Citus extension version variables */ bool EnableVersionChecks = true; /* version checks are enabled */ -char *availableExtensionVersion = NULL; -static char *installedExtensionVersion = NULL; +bool citusVersionKnownCompatible = false; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; @@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ +static bool IsDistributedTableViaCatalog(Oid relationId); static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); @@ -142,7 +142,8 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); -static void ErrorIfInstalledVersionMismatch(void); +static bool CheckInstalledVersion(int elevel); +static bool CheckAvailableVersion(int elevel); static char * AvailableExtensionVersion(void); static char * InstalledExtensionVersion(void); static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, @@ -184,22 +185,59 @@ IsDistributedTable(Oid relationId) { DistTableCacheEntry *cacheEntry = NULL; + cacheEntry = LookupDistTableCacheEntry(relationId); + /* - * Can't be a distributed relation if the extension hasn't been loaded - * yet. As we can't do lookups in nonexistent tables, directly return - * false. + * If extension hasn't been created, or has the wrong version and the + * table isn't a distributed one, LookupDistTableCacheEntry() will return NULL. */ - if (!CitusHasBeenLoaded()) + if (!cacheEntry) { return false; } - cacheEntry = LookupDistTableCacheEntry(relationId); - return cacheEntry->isDistributedTable; } +/* + * IsDistributedTableViaCatalog returns whether the given relation is a + * distributed table or not. + * + * It does so by searching pg_dist_partition, explicitly bypassing caches, + * because this function is designed to be used in cases where accessing + * metadata tables is not safe. + * + * NB: Currently this still hardcodes pg_dist_partition logicalrelid column + * offset and the corresponding index. If we ever come close to changing + * that, we'll have to work a bit harder. + */ +static bool +IsDistributedTableViaCatalog(Oid relationId) +{ + HeapTuple partitionTuple = NULL; + SysScanDesc scanDescriptor = NULL; + const int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + + Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); + + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionLogicalRelidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + partitionTuple = systable_getnext(scanDescriptor); + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, AccessShareLock); + + return HeapTupleIsValid(partitionTuple); +} + + /* * DistributedTableList returns a list that includes all the valid distributed table * cache entries. @@ -211,6 +249,8 @@ DistributedTableList(void) List *distributedTableList = NIL; ListCell *distTableOidCell = NULL; + Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); + /* first, we need to iterate over pg_dist_partition */ distTableOidList = DistTableOidList(); @@ -360,6 +400,8 @@ LookupShardCacheEntry(int64 shardId) bool foundInCache = false; bool recheck = false; + Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); + /* probably not reachable */ if (DistShardCacheHash == NULL) { @@ -435,27 +477,16 @@ DistributedTableCacheEntry(Oid distributedRelationId) { DistTableCacheEntry *cacheEntry = NULL; - /* - * Can't be a distributed relation if the extension hasn't been loaded - * yet. As we can't do lookups in nonexistent tables, directly return NULL - * here. - */ - if (!CitusHasBeenLoaded()) - { - return NULL; - } - cacheEntry = LookupDistTableCacheEntry(distributedRelationId); - if (cacheEntry->isDistributedTable) + if (cacheEntry && cacheEntry->isDistributedTable) { return cacheEntry; } else { char *relationName = get_rel_name(distributedRelationId); - ereport(ERROR, (errmsg("relation %s is not distributed", - relationName))); + ereport(ERROR, (errmsg("relation %s is not distributed", relationName))); } } @@ -471,6 +502,43 @@ LookupDistTableCacheEntry(Oid relationId) bool foundInCache = false; void *hashKey = (void *) &relationId; + /* + * Can't be a distributed relation if the extension hasn't been loaded + * yet. As we can't do lookups in nonexistent tables, directly return NULL + * here. + */ + if (!CitusHasBeenLoaded()) + { + return NULL; + } + + /* + * If the version is not known to be compatible, perform thorough check, + * unless such checks are disabled. + */ + if (!citusVersionKnownCompatible && EnableVersionChecks) + { + bool isDistributed = IsDistributedTableViaCatalog(relationId); + int reportLevel = DEBUG1; + + /* + * If there's a version-mismatch, and we're dealing with a distributed + * table, we have to error out as we can't return a valid entry. We + * want to check compatibility in the non-distributed case as well, so + * future lookups can use the cache if compatible. + */ + if (isDistributed) + { + reportLevel = ERROR; + } + + if (!CheckCitusVersion(reportLevel)) + { + /* incompatible, can't access cache, so return before doing so */ + return NULL; + } + } + if (DistTableCacheHash == NULL) { InitializeDistTableCache(); @@ -1066,86 +1134,114 @@ CitusHasBeenLoaded(void) DistPartitionRelationId(); /* - * We also set installedExtensionVersion to NULL so that it will be re-read - * in case of extension update. + * We also reset citusVersionKnownCompatible, so it will be re-read in + * case of extension update. */ - installedExtensionVersion = NULL; + citusVersionKnownCompatible = false; } } - if (extensionLoaded) - { - ErrorIfAvailableVersionMismatch(); - ErrorIfInstalledVersionMismatch(); - } - return extensionLoaded; } /* - * ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and - * currently available version from citus.control file. If they are not same in - * major or minor version numbers, this function errors out. It ignores the schema - * version. + * CheckCitusVersion checks whether there is a version mismatch between the + * available version and the loaded version or between the installed version + * and the loaded version. Returns true if compatible, false otherwise. + * + * As a side effect, this function also sets citusVersionKnownCompatible global + * variable to true which reduces version check cost of next calls. */ -void -ErrorIfAvailableVersionMismatch(void) +bool +CheckCitusVersion(int elevel) +{ + if (citusVersionKnownCompatible || + !CitusHasBeenLoaded() || + !EnableVersionChecks) + { + return true; + } + + if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel)) + { + citusVersionKnownCompatible = true; + return true; + } + else + { + return false; + } +} + + +/* + * CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently + * available version from the citus.control file. If they are not compatible, + * this function logs an error with the specified elevel and returns false, + * otherwise it returns true. + */ +static bool +CheckAvailableVersion(int elevel) { char *availableVersion = NULL; - if (!EnableVersionChecks) - { - return; - } + Assert(CitusHasBeenLoaded()); + Assert(EnableVersionChecks); availableVersion = AvailableExtensionVersion(); + if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION)) { - ereport(ERROR, (errmsg("loaded Citus library version differs from latest " - "available extension version"), - errdetail("Loaded library requires %s, but the latest control " - "file specifies %s.", CITUS_MAJORVERSION, - availableVersion), - errhint("Restart the database to load the latest Citus " - "library."))); + ereport(elevel, (errmsg("loaded Citus library version differs from latest " + "available extension version"), + errdetail("Loaded library requires %s, but the latest control " + "file specifies %s.", CITUS_MAJORVERSION, + availableVersion), + errhint("Restart the database to load the latest Citus " + "library."))); + return false; } + + return true; } /* - * ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently - * and catalog version from pg_extemsion catalog table. If they are not same in - * major or minor version numbers, this function errors out. It ignores the schema - * version. + * CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the + * extension's current version from the pg_extemsion catalog table. If they + * are not compatible, this function logs an error with the specified elevel, + * otherwise it returns true. */ -static void -ErrorIfInstalledVersionMismatch(void) +static bool +CheckInstalledVersion(int elevel) { char *installedVersion = NULL; - if (!EnableVersionChecks) - { - return; - } + Assert(CitusHasBeenLoaded()); + Assert(EnableVersionChecks); installedVersion = InstalledExtensionVersion(); + if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION)) { - ereport(ERROR, (errmsg("loaded Citus library version differs from installed " - "extension version"), - errdetail("Loaded library requires %s, but the installed " - "extension version is %s.", CITUS_MAJORVERSION, - installedVersion), - errhint("Run ALTER EXTENSION citus UPDATE and try again."))); + ereport(elevel, (errmsg("loaded Citus library version differs from installed " + "extension version"), + errdetail("Loaded library requires %s, but the installed " + "extension version is %s.", CITUS_MAJORVERSION, + installedVersion), + errhint("Run ALTER EXTENSION citus UPDATE and try again."))); + return false; } + + return true; } /* - * MajorVersionsCompatible compares given two versions. If they are same in major - * and minor version numbers, this function returns true. It ignores the schema - * version. + * MajorVersionsCompatible checks whether both versions are compatible. They + * are if major and minor version numbers match, the schema version is + * ignored. Returns true if compatible, false otherwise. */ bool MajorVersionsCompatible(char *leftVersion, char *rightVersion) @@ -1203,12 +1299,7 @@ AvailableExtensionVersion(void) bool hasTuple = false; bool goForward = true; bool doCopy = false; - - /* if we cached the result before, return it*/ - if (availableExtensionVersion != NULL) - { - return availableExtensionVersion; - } + char *availableExtensionVersion; estate = CreateExecutorState(); extensionsResultSet = makeNode(ReturnSetInfo); @@ -1274,8 +1365,6 @@ AvailableExtensionVersion(void) /* * InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table. - * It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion - * will not read the catalog tables again. */ static char * InstalledExtensionVersion(void) @@ -1284,12 +1373,7 @@ InstalledExtensionVersion(void) SysScanDesc scandesc; ScanKeyData entry[1]; HeapTuple extensionTuple = NULL; - - /* if we cached the result before, return it*/ - if (installedExtensionVersion != NULL) - { - return installedExtensionVersion; - } + char *installedExtensionVersion = NULL; relation = heap_open(ExtensionRelationId, AccessShareLock); @@ -1670,6 +1754,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1731,6 +1817,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1792,6 +1880,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1848,6 +1938,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -1871,6 +1963,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index fd4d8970b..ecac82267 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -95,9 +95,12 @@ master_add_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); /* * After adding new node, if the node did not already exist, we will activate @@ -129,9 +132,12 @@ master_add_inactive_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); PG_RETURN_CSTRING(nodeRecord); } @@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS) int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); + CheckCitusVersion(ERROR); + RemoveNodeFromCluster(nodeNameString, nodePort); PG_RETURN_VOID(); @@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS) bool hasShardPlacements = false; bool isActive = false; + CheckCitusVersion(ERROR); + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); @@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); Datum nodeRecord = 0; + CheckCitusVersion(ERROR); + nodeRecord = ActivateNode(nodeNameString, nodePort); PG_RETURN_CSTRING(nodeRecord); @@ -263,9 +275,12 @@ Datum master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; - List *workerNodes = ParseWorkerNodeFileAndRename(); + List *workerNodes = NULL; bool nodeAlreadyExists = false; + CheckCitusVersion(ERROR); + + workerNodes = ParseWorkerNodeFileAndRename(); foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); @@ -273,8 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, workerNode->workerRack, false, workerNode->isActive, &nodeAlreadyExists); - - ActivateNode(workerNode->workerName, workerNode->workerPort); } PG_RETURN_BOOL(true); @@ -293,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) char distributionMethod = 0; Oid relationId = InvalidOid; + CheckCitusVersion(ERROR); + /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore * we need to check all parameters for NULL values. @@ -1045,7 +1060,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; workerNode->hasMetadata = false; - workerNode->isActive = false; + workerNode->isActive = true; workerNodeList = lappend(workerNodeList, workerNode); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 2b09f7878..ecda24eb1 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) DistTableCacheEntry *tableEntry = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (!IsDistributedTable(relationId)) { diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 766bc166a..15daf3d53 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); @@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index aea2cdbd5..25634f953 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) char *taskCallString = text_to_cstring(taskCallStringText); uint32 taskCallStringLength = strlen(taskCallString); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + /* check that we have a running task tracker on this host */ - bool taskTrackerRunning = TaskTrackerRunning(); + taskTrackerRunning = TaskTrackerRunning(); if (!taskTrackerRunning) { ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), @@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS) WorkerTask *workerTask = NULL; uint32 taskStatus = 0; - bool taskTrackerRunning = TaskTrackerRunning(); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + + taskTrackerRunning = TaskTrackerRunning(); + if (taskTrackerRunning) { LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); @@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) StringInfo jobDirectoryName = NULL; StringInfo jobSchemaName = NULL; + CheckCitusVersion(ERROR); + /* * We first clean up any open connections, and remove tasks belonging to * this job from the shared hash. diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 3fe98fdf4..4e1045f57 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -114,6 +114,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -155,6 +158,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -415,6 +421,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, @@ -443,6 +451,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, @@ -472,6 +482,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) Oid sequenceRelationId = InvalidOid; NodeTag nodeType = nodeTag(commandNode); + + CheckCitusVersion(ERROR); + if (nodeType != T_CreateSeqStmt) { ereport(ERROR, @@ -513,6 +526,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -537,6 +552,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -1211,6 +1228,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) bool received = false; StringInfo queryString = NULL; + CheckCitusVersion(ERROR); + /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index d802f643f..e55da62de 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; Relation distributedRelation = NULL; - List *shardList = LoadShardList(relationId); + List *shardList = NULL; ListCell *shardCell = NULL; char relationKind = '\0'; + CheckCitusVersion(ERROR); EnsureSuperUser(); + shardList = LoadShardList(relationId); + /* first check the relation type */ distributedRelation = relation_open(relationId, AccessShareLock); relationKind = distributedRelation->rd_rel->relkind; diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index 546eaddea..9ec8882b2 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) ForeignTable *foreignTable = GetForeignTable(relationId); ListCell *optionCell = NULL; + + CheckCitusVersion(ERROR); + foreach(optionCell, foreignTable->options) { DefElem *option = (DefElem *) lfirst(optionCell); @@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS) (void) blockId; (void) dataDirectoryObject; + CheckCitusVersion(ERROR); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("called function is currently unsupported"))); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 6754f4acd..f1714e709 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) /* we should have the same number of column names and types */ int32 columnNameCount = ArrayObjectCount(columnNameObject); int32 columnTypeCount = ArrayObjectCount(columnTypeObject); + + CheckCitusVersion(ERROR); + if (columnNameCount != columnTypeCount) { ereport(ERROR, (errmsg("column name array size: %d and type array size: %d" @@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) int createIntermediateTableResult = 0; int finished = 0; + CheckCitusVersion(ERROR); + /* * If the schema for the job isn't already created by the task tracker * protocol, we fall to using the default 'public' schema. @@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS) int scanKeyCount = 0; HeapTuple heapTuple = NULL; + CheckCitusVersion(ERROR); + pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock); scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d7731f426..b15915f78 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -112,6 +112,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* first check that array element's and partition column's types match */ Oid splitPointType = ARR_ELEMTYPE(splitPointObject); + + CheckCitusVersion(ERROR); + if (splitPointType != partitionColumnType) { ereport(ERROR, (errmsg("partition column type %u and split point type %u " @@ -184,6 +187,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) FileOutputStream *partitionFileArray = NULL; uint32 fileCount = partitionCount; + CheckCitusVersion(ERROR); + /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC); diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c index 237768e70..099c38fb8 100644 --- a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); EnsureSuperUser(); + CheckCitusVersion(ERROR); /* Create the truncate trigger */ CreateTruncateTrigger(relationId); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index ae83940d4..14b0e86c6 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -18,7 +18,7 @@ #include "utils/hsearch.h" extern bool EnableVersionChecks; -extern char *availableExtensionVersion; +extern bool citusVersionKnownCompatible; /* * Representation of a table's metadata that is frequently used for @@ -79,7 +79,7 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern bool CitusHasBeenLoaded(void); -void ErrorIfAvailableVersionMismatch(void); +extern bool CheckCitusVersion(int elevel); bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); /* access WorkerNodeHash */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 4236ed162..9715eb3b5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -109,6 +109,67 @@ CREATE EXTENSION citus VERSION '5.0'; ERROR: specified version incompatible with loaded Citus library DETAIL: Loaded library requires 7.0, but 5.0 was specified. HINT: If a newer library is present, restart the database and try the command again. +-- Test non-distributed queries work even in version mismatch +SET citus.enable_version_checks TO 'false'; +CREATE EXTENSION citus VERSION '6.2-1'; +SET citus.enable_version_checks TO 'true'; +-- Test CREATE TABLE +CREATE TABLE version_mismatch_table(column1 int); +-- Test COPY +\copy version_mismatch_table FROM STDIN; +-- Test INSERT +INSERT INTO version_mismatch_table(column1) VALUES(5); + +-- Test SELECT +SELECT * FROM version_mismatch_table ORDER BY column1; + column1 +--------- + 0 + 1 + 2 + 3 + 4 + 5 +(6 rows) + +-- Test SELECT from pg_catalog +SELECT d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges" +FROM pg_catalog.pg_database d +ORDER BY 1; + Name | Owner | Access privileges +------------+----------+----------------------- + postgres | postgres | + regression | postgres | + template0 | postgres | =c/postgres + + | | postgres=CTc/postgres + template1 | postgres | =c/postgres + + | | postgres=CTc/postgres +(4 rows) + +-- We should not distribute table in version mistmatch +SELECT create_distributed_table('version_mismatch_table', 'column1'); +ERROR: loaded Citus library version differs from installed extension version +DETAIL: Loaded library requires 7.0, but the installed extension version is 6.2-1. +HINT: Run ALTER EXTENSION citus UPDATE and try again. +-- This function will cause fail in next ALTER EXTENSION +CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass) +RETURNS bigint LANGUAGE plpgsql +AS $function$ +BEGIN +END; +$function$; +SET citus.enable_version_checks TO 'false'; +-- This will fail because of previous function declaration +ALTER EXTENSION citus UPDATE TO '6.2-2'; +ERROR: function "citus_table_size" already exists with same argument types +-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on +SET citus.enable_version_checks TO 'true'; +DROP FUNCTION citus_table_size(regclass); +SET citus.enable_version_checks TO 'false'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; -- re-create in newest version +DROP EXTENSION citus; \c CREATE EXTENSION citus; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index db625ecd6..327644f9c 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -101,6 +101,59 @@ RESET citus.enable_version_checks; DROP EXTENSION citus; CREATE EXTENSION citus VERSION '5.0'; +-- Test non-distributed queries work even in version mismatch +SET citus.enable_version_checks TO 'false'; +CREATE EXTENSION citus VERSION '6.2-1'; +SET citus.enable_version_checks TO 'true'; + +-- Test CREATE TABLE +CREATE TABLE version_mismatch_table(column1 int); + +-- Test COPY +\copy version_mismatch_table FROM STDIN; +0 +1 +2 +3 +4 +\. + +-- Test INSERT +INSERT INTO version_mismatch_table(column1) VALUES(5); + +-- Test SELECT +SELECT * FROM version_mismatch_table ORDER BY column1; + +-- Test SELECT from pg_catalog +SELECT d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges" +FROM pg_catalog.pg_database d +ORDER BY 1; + +-- We should not distribute table in version mistmatch +SELECT create_distributed_table('version_mismatch_table', 'column1'); + +-- This function will cause fail in next ALTER EXTENSION +CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass) +RETURNS bigint LANGUAGE plpgsql +AS $function$ +BEGIN +END; +$function$; + +SET citus.enable_version_checks TO 'false'; +-- This will fail because of previous function declaration +ALTER EXTENSION citus UPDATE TO '6.2-2'; + +-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on +SET citus.enable_version_checks TO 'true'; +DROP FUNCTION citus_table_size(regclass); + +SET citus.enable_version_checks TO 'false'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; + -- re-create in newest version +DROP EXTENSION citus; \c CREATE EXTENSION citus;