From acb0d237175190c503b39259da85349672edc702 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 18 May 2017 20:42:14 +0300 Subject: [PATCH 1/4] Fix crash during upgrade from 5.2 to 6.2 This commit fixes the problem where we incorrectly try to reach distributed table cache when the extension is not loaded completely. We tried to reach the cache because we wanted to get reference table information to activate the node. However it is actually not necessary to explicitly activate the nodes which come from master_initialize_node_metadata. Because it only runs during extension creation and at that time there are no reference tables and all nodes are considered as active. --- src/backend/distributed/utils/node_metadata.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index fd4d8970b..82916b85f 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -274,7 +274,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) workerNode->workerRack, false, workerNode->isActive, &nodeAlreadyExists); - ActivateNode(workerNode->workerName, workerNode->workerPort); } PG_RETURN_BOOL(true); @@ -1045,7 +1044,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; workerNode->hasMetadata = false; - workerNode->isActive = false; + workerNode->isActive = true; workerNodeList = lappend(workerNodeList, workerNode); } From eea8c51e1f25cba0dc711a299735e481f90c4c4b Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 19 May 2017 19:10:07 +0300 Subject: [PATCH 2/4] Only error out on distributed queries when there is version mismatch Before this commit, we were erroring out at almost all queries if there is a version mismatch. With this commit, we started to error out only requested operation touches distributed tables. Normally we would need to use distributed cache to understand whether a table is distributed or not. However, it is not safe to read our metadata tables when there is a version mismatch, thus it is not safe to create distributed cache. Therefore for this specific occasion, we directly read from pg_dist_partition table. However; reading from catalog is costly and we should not use this method in other places as much as possible. --- .../distributed/executor/multi_utility.c | 86 ++---- .../distributed/utils/metadata_cache.c | 246 ++++++++++++------ src/include/distributed/metadata_cache.h | 4 +- 3 files changed, 184 insertions(+), 152 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index d322324f8..2142b90db 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -99,7 +99,6 @@ struct DropRelationCallbackState /* Local functions forward declarations for deciding when to perform processing/checks */ -static bool SkipCitusProcessingForUtility(Node *parsetree); static bool IsCitusExtensionStmt(Node *parsetree); /* Local functions forward declarations for Transmit statement */ @@ -185,23 +184,29 @@ multi_ProcessUtility(Node *parsetree, Oid savedUserId = InvalidOid; int savedSecurityContext = 0; List *ddlJobs = NIL; - bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree); + bool checkExtensionVersion = false; - if (skipCitusProcessing) + if (IsA(parsetree, TransactionStmt)) { - bool checkExtensionVersion = IsCitusExtensionStmt(parsetree); - + /* + * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted + * transactions in which case a lot of checks cannot be done safely in + * that state. Since we never need to intercept transaction statements, + * skip our checks and immediately fall into standard_ProcessUtility. + */ standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); - if (EnableVersionChecks && checkExtensionVersion) - { - ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); - } - return; } + checkExtensionVersion = IsCitusExtensionStmt(parsetree); + if (EnableVersionChecks && checkExtensionVersion) + { + ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); + } + + if (!CitusHasBeenLoaded()) { /* @@ -447,63 +452,6 @@ multi_ProcessUtility(Node *parsetree, } -/* - * SkipCitusProcessingForUtility simply returns whether a given utility should - * bypass Citus processing and checks and be handled exclusively by standard - * PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION, - * ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus. - */ -static bool -SkipCitusProcessingForUtility(Node *parsetree) -{ - switch (parsetree->type) - { - /* - * In the CitusHasBeenLoaded check, we compare versions of loaded code, - * the installed extension, and available extension. If they differ, we - * force user to execute ALTER EXTENSION citus UPDATE. To allow this, - * CREATE/DROP/ALTER extension must be omitted from Citus processing. - */ - case T_DropStmt: - { - DropStmt *dropStatement = (DropStmt *) parsetree; - - if (dropStatement->removeType != OBJECT_EXTENSION) - { - return false; - } - } - - /* no break, fall through */ - - case T_CreateExtensionStmt: - case T_AlterExtensionStmt: - - /* - * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted - * transactions in which case a lot of checks cannot be done safely in - * that state. Since we never need to intercept transaction statements, - * skip our checks and immediately fall into standard_ProcessUtility. - */ - case T_TransactionStmt: - - /* - * Skip processing of variable set statements, to allow changing the - * enable_version_checks GUC during testing. - */ - case T_VariableSetStmt: - { - return true; - } - - default: - { - return false; - } - } -} - - /* * IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER * EXTENSION statement which references the citus extension. This function @@ -1474,8 +1422,8 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree) * from the citus.control file. In case a new default is available, we * will force a compatibility check of the latest available version. */ - availableExtensionVersion = NULL; - ErrorIfAvailableVersionMismatch(); + citusVersionKnownCompatible = false; + CheckCitusVersion(ERROR); } } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 66a2af3da..d97ce3625 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid; /* Citus extension version variables */ bool EnableVersionChecks = true; /* version checks are enabled */ -char *availableExtensionVersion = NULL; -static char *installedExtensionVersion = NULL; +bool citusVersionKnownCompatible = false; /* Hash table for informations about each partition */ static HTAB *DistTableCacheHash = NULL; @@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1]; /* local function forward declarations */ +static bool IsDistributedTableViaCatalog(Oid relationId); static ShardCacheEntry * LookupShardCacheEntry(int64 shardId); static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); @@ -142,7 +142,8 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); -static void ErrorIfInstalledVersionMismatch(void); +static bool CheckInstalledVersion(int elevel); +static bool CheckAvailableVersion(int elevel); static char * AvailableExtensionVersion(void); static char * InstalledExtensionVersion(void); static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, @@ -184,22 +185,59 @@ IsDistributedTable(Oid relationId) { DistTableCacheEntry *cacheEntry = NULL; + cacheEntry = LookupDistTableCacheEntry(relationId); + /* - * Can't be a distributed relation if the extension hasn't been loaded - * yet. As we can't do lookups in nonexistent tables, directly return - * false. + * If extension hasn't been created, or has the wrong version and the + * table isn't a distributed one, LookupDistTableCacheEntry() will return NULL. */ - if (!CitusHasBeenLoaded()) + if (!cacheEntry) { return false; } - cacheEntry = LookupDistTableCacheEntry(relationId); - return cacheEntry->isDistributedTable; } +/* + * IsDistributedTableViaCatalog returns whether the given relation is a + * distributed table or not. + * + * It does so by searching pg_dist_partition, explicitly bypassing caches, + * because this function is designed to be used in cases where accessing + * metadata tables is not safe. + * + * NB: Currently this still hardcodes pg_dist_partition logicalrelid column + * offset and the corresponding index. If we ever come close to changing + * that, we'll have to work a bit harder. + */ +static bool +IsDistributedTableViaCatalog(Oid relationId) +{ + HeapTuple partitionTuple = NULL; + SysScanDesc scanDescriptor = NULL; + const int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + + Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); + + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionLogicalRelidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + partitionTuple = systable_getnext(scanDescriptor); + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, AccessShareLock); + + return HeapTupleIsValid(partitionTuple); +} + + /* * DistributedTableList returns a list that includes all the valid distributed table * cache entries. @@ -211,6 +249,8 @@ DistributedTableList(void) List *distributedTableList = NIL; ListCell *distTableOidCell = NULL; + Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); + /* first, we need to iterate over pg_dist_partition */ distTableOidList = DistTableOidList(); @@ -360,6 +400,8 @@ LookupShardCacheEntry(int64 shardId) bool foundInCache = false; bool recheck = false; + Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); + /* probably not reachable */ if (DistShardCacheHash == NULL) { @@ -435,27 +477,16 @@ DistributedTableCacheEntry(Oid distributedRelationId) { DistTableCacheEntry *cacheEntry = NULL; - /* - * Can't be a distributed relation if the extension hasn't been loaded - * yet. As we can't do lookups in nonexistent tables, directly return NULL - * here. - */ - if (!CitusHasBeenLoaded()) - { - return NULL; - } - cacheEntry = LookupDistTableCacheEntry(distributedRelationId); - if (cacheEntry->isDistributedTable) + if (cacheEntry && cacheEntry->isDistributedTable) { return cacheEntry; } else { char *relationName = get_rel_name(distributedRelationId); - ereport(ERROR, (errmsg("relation %s is not distributed", - relationName))); + ereport(ERROR, (errmsg("relation %s is not distributed", relationName))); } } @@ -471,6 +502,43 @@ LookupDistTableCacheEntry(Oid relationId) bool foundInCache = false; void *hashKey = (void *) &relationId; + /* + * Can't be a distributed relation if the extension hasn't been loaded + * yet. As we can't do lookups in nonexistent tables, directly return NULL + * here. + */ + if (!CitusHasBeenLoaded()) + { + return NULL; + } + + /* + * If the version is not known to be compatible, perform thorough check, + * unless such checks are disabled. + */ + if (!citusVersionKnownCompatible && EnableVersionChecks) + { + bool isDistributed = IsDistributedTableViaCatalog(relationId); + int reportLevel = DEBUG1; + + /* + * If there's a version-mismatch, and we're dealing with a distributed + * table, we have to error out as we can't return a valid entry. We + * want to check compatibility in the non-distributed case as well, so + * future lookups can use the cache if compatible. + */ + if (isDistributed) + { + reportLevel = ERROR; + } + + if (!CheckCitusVersion(reportLevel)) + { + /* incompatible, can't access cache, so return before doing so */ + return NULL; + } + } + if (DistTableCacheHash == NULL) { InitializeDistTableCache(); @@ -1066,86 +1134,114 @@ CitusHasBeenLoaded(void) DistPartitionRelationId(); /* - * We also set installedExtensionVersion to NULL so that it will be re-read - * in case of extension update. + * We also reset citusVersionKnownCompatible, so it will be re-read in + * case of extension update. */ - installedExtensionVersion = NULL; + citusVersionKnownCompatible = false; } } - if (extensionLoaded) - { - ErrorIfAvailableVersionMismatch(); - ErrorIfInstalledVersionMismatch(); - } - return extensionLoaded; } /* - * ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and - * currently available version from citus.control file. If they are not same in - * major or minor version numbers, this function errors out. It ignores the schema - * version. + * CheckCitusVersion checks whether there is a version mismatch between the + * available version and the loaded version or between the installed version + * and the loaded version. Returns true if compatible, false otherwise. + * + * As a side effect, this function also sets citusVersionKnownCompatible global + * variable to true which reduces version check cost of next calls. */ -void -ErrorIfAvailableVersionMismatch(void) +bool +CheckCitusVersion(int elevel) +{ + if (citusVersionKnownCompatible || + !CitusHasBeenLoaded() || + !EnableVersionChecks) + { + return true; + } + + if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel)) + { + citusVersionKnownCompatible = true; + return true; + } + else + { + return false; + } +} + + +/* + * CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently + * available version from the citus.control file. If they are not compatible, + * this function logs an error with the specified elevel and returns false, + * otherwise it returns true. + */ +static bool +CheckAvailableVersion(int elevel) { char *availableVersion = NULL; - if (!EnableVersionChecks) - { - return; - } + Assert(CitusHasBeenLoaded()); + Assert(EnableVersionChecks); availableVersion = AvailableExtensionVersion(); + if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION)) { - ereport(ERROR, (errmsg("loaded Citus library version differs from latest " - "available extension version"), - errdetail("Loaded library requires %s, but the latest control " - "file specifies %s.", CITUS_MAJORVERSION, - availableVersion), - errhint("Restart the database to load the latest Citus " - "library."))); + ereport(elevel, (errmsg("loaded Citus library version differs from latest " + "available extension version"), + errdetail("Loaded library requires %s, but the latest control " + "file specifies %s.", CITUS_MAJORVERSION, + availableVersion), + errhint("Restart the database to load the latest Citus " + "library."))); + return false; } + + return true; } /* - * ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently - * and catalog version from pg_extemsion catalog table. If they are not same in - * major or minor version numbers, this function errors out. It ignores the schema - * version. + * CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the + * extension's current version from the pg_extemsion catalog table. If they + * are not compatible, this function logs an error with the specified elevel, + * otherwise it returns true. */ -static void -ErrorIfInstalledVersionMismatch(void) +static bool +CheckInstalledVersion(int elevel) { char *installedVersion = NULL; - if (!EnableVersionChecks) - { - return; - } + Assert(CitusHasBeenLoaded()); + Assert(EnableVersionChecks); installedVersion = InstalledExtensionVersion(); + if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION)) { - ereport(ERROR, (errmsg("loaded Citus library version differs from installed " - "extension version"), - errdetail("Loaded library requires %s, but the installed " - "extension version is %s.", CITUS_MAJORVERSION, - installedVersion), - errhint("Run ALTER EXTENSION citus UPDATE and try again."))); + ereport(elevel, (errmsg("loaded Citus library version differs from installed " + "extension version"), + errdetail("Loaded library requires %s, but the installed " + "extension version is %s.", CITUS_MAJORVERSION, + installedVersion), + errhint("Run ALTER EXTENSION citus UPDATE and try again."))); + return false; } + + return true; } /* - * MajorVersionsCompatible compares given two versions. If they are same in major - * and minor version numbers, this function returns true. It ignores the schema - * version. + * MajorVersionsCompatible checks whether both versions are compatible. They + * are if major and minor version numbers match, the schema version is + * ignored. Returns true if compatible, false otherwise. */ bool MajorVersionsCompatible(char *leftVersion, char *rightVersion) @@ -1203,12 +1299,7 @@ AvailableExtensionVersion(void) bool hasTuple = false; bool goForward = true; bool doCopy = false; - - /* if we cached the result before, return it*/ - if (availableExtensionVersion != NULL) - { - return availableExtensionVersion; - } + char *availableExtensionVersion; estate = CreateExecutorState(); extensionsResultSet = makeNode(ReturnSetInfo); @@ -1274,8 +1365,6 @@ AvailableExtensionVersion(void) /* * InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table. - * It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion - * will not read the catalog tables again. */ static char * InstalledExtensionVersion(void) @@ -1284,12 +1373,7 @@ InstalledExtensionVersion(void) SysScanDesc scandesc; ScanKeyData entry[1]; HeapTuple extensionTuple = NULL; - - /* if we cached the result before, return it*/ - if (installedExtensionVersion != NULL) - { - return installedExtensionVersion; - } + char *installedExtensionVersion = NULL; relation = heap_open(ExtensionRelationId, AccessShareLock); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index ae83940d4..14b0e86c6 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -18,7 +18,7 @@ #include "utils/hsearch.h" extern bool EnableVersionChecks; -extern char *availableExtensionVersion; +extern bool citusVersionKnownCompatible; /* * Representation of a table's metadata that is frequently used for @@ -79,7 +79,7 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern bool CitusHasBeenLoaded(void); -void ErrorIfAvailableVersionMismatch(void); +extern bool CheckCitusVersion(int elevel); bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); /* access WorkerNodeHash */ From 9fb15c439c1ce6816a8e7f247f4dde26d8102783 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 19 May 2017 19:11:57 +0300 Subject: [PATCH 3/4] Add version checks to necessary UDFs --- .../commands/create_distributed_table.c | 10 +++++-- .../commands/drop_distributed_table.c | 1 + .../distributed/master/master_citus_tools.c | 2 ++ .../distributed/master/master_create_shards.c | 1 + .../master/master_delete_protocol.c | 4 +++ .../master/master_expire_table_cache.c | 15 +++++++--- .../master/master_metadata_utility.c | 6 ++++ .../master/master_modify_multiple_shards.c | 2 ++ .../distributed/master/master_node_protocol.c | 8 ++++++ .../distributed/master/master_repair_shards.c | 1 + .../distributed/master/master_split_shards.c | 2 ++ .../master/master_stage_protocol.c | 23 +++++++++++---- .../distributed/metadata/metadata_sync.c | 2 ++ .../distributed/relay/relay_event_utility.c | 4 ++- .../transaction/transaction_recovery.c | 2 ++ .../distributed/utils/colocation_utils.c | 1 + .../distributed/utils/distribution_column.c | 5 ++++ .../distributed/utils/metadata_cache.c | 10 +++++++ src/backend/distributed/utils/node_metadata.c | 28 +++++++++++++++---- .../distributed/utils/reference_table_utils.c | 1 + src/backend/distributed/utils/resource_lock.c | 4 +++ .../worker/task_tracker_protocol.c | 15 ++++++++-- .../worker/worker_data_fetch_protocol.c | 19 +++++++++++++ .../distributed/worker/worker_drop_protocol.c | 5 +++- .../worker/worker_file_access_protocol.c | 5 ++++ .../worker/worker_merge_protocol.c | 7 +++++ .../worker/worker_partition_protocol.c | 5 ++++ .../worker/worker_truncate_trigger_protocol.c | 1 + 28 files changed, 168 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 825bf84d4..103fedc4f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) bool requireEmpty = true; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { @@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) @@ -233,13 +235,17 @@ static void CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = ActiveWorkerNodeList(); - int replicationFactor = list_length(workerNodeList); + List *workerNodeList = NIL; + int replicationFactor = 0; char *distributionColumnName = NULL; bool requireEmpty = true; char relationKind = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + replicationFactor = list_length(workerNodeList); /* if there are no workers, error out */ if (replicationFactor == 0) diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 76b29f1c8..734d478d3 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) char *tableName = text_to_cstring(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index b46b60739..946ec16cf 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS) int commandIndex = 0; int commandCount = 0; + CheckCitusVersion(ERROR); + /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) { diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 56ec7e788..a56f283bf 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) Oid distributedTableId = ResolveRelationId(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index ed8418aa4..8f5943696 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) bool failOK = false; EnsureCoordinator(); + CheckCitusVersion(ERROR); queryTreeNode = ParseTreeNode(queryString); if (!IsA(queryTreeNode, DeleteStmt)) @@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); @@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS) StringInfo dropSeqCommand = makeStringInfo(); bool coordinator = IsCoordinator(); + CheckCitusVersion(ERROR); + /* do nothing if DDL propagation is switched off or this is not the coordinator */ if (!EnableDDLPropagation || !coordinator) { diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 70eef82aa..8d4f38b03 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -46,14 +46,21 @@ Datum master_expire_table_cache(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); - List *workerNodeList = ActiveWorkerNodeList(); + DistTableCacheEntry *cacheEntry = NULL; + List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; - int shardCount = cacheEntry->shardIntervalArrayLength; - ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + int shardCount = 0; + ShardInterval **shardIntervalArray = NULL; List **placementListArray = NULL; int shardIndex = 0; + CheckCitusVersion(ERROR); + + cacheEntry = DistributedTableCacheEntry(relationId); + workerNodeList = ActiveWorkerNodeList(); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalArray = cacheEntry->sortedShardIntervalArray; + if (shardCount == 0) { ereport(WARNING, (errmsg("Table has no shards, no action is taken"))); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index e6bfa574b..4081ad35b 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 totalRelationSize = 0; + CheckCitusVersion(ERROR); + totalRelationSize = DistributedTableSize(relationId, PG_TOTAL_RELATION_SIZE_FUNCTION); @@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 tableSize = 0; + CheckCitusVersion(ERROR); + tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION); PG_RETURN_INT64(tableSize); @@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 relationSize = 0; + CheckCitusVersion(ERROR); + relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION); PG_RETURN_INT64(relationSize); diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 086846fde..f7838b08c 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) int32 affectedTupleCount = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index db83b2665..d7214cf04 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS) Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; + CheckCitusVersion(ERROR); + /* find partition tuple for partitioned relation */ partitionEntry = DistributedTableCacheEntry(relationId); @@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) FuncCallContext *functionContext = NULL; ListCell *tableDDLEventCell = NULL; + CheckCitusVersion(ERROR); + /* * On the very first call to this function, we first use the given relation * name to get to the relation. We then recreate the list of DDL statements @@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS) Datum shardIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); shardId = GetNextShardId(); shardIdDatum = Int64GetDatum(shardId); @@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS) Datum placementIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); placementId = GetNextPlacementId(); placementIdDatum = Int64GetDatum(placementId); @@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) uint32 workerNodeIndex = 0; uint32 workerNodeCount = 0; + CheckCitusVersion(ERROR); + if (SRF_IS_FIRSTCALL()) { MemoryContext oldContext = NULL; diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 2471da365..1774895f8 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); /* RepairShardPlacement function repairs only given shard */ RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, diff --git a/src/backend/distributed/master/master_split_shards.c b/src/backend/distributed/master/master_split_shards.c index cc18a3c3e..a1e928e5d 100644 --- a/src/backend/distributed/master/master_split_shards.c +++ b/src/backend/distributed/master/master_split_shards.c @@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS) FmgrInfo *hashFunction = NULL; Oid valueDataType = InvalidOid; + CheckCitusVersion(ERROR); + /* figure out hash function from the data type */ valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f24bfc07c..cf492bc02 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = NIL; uint64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; @@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char replicationModel = REPLICATION_MODEL_INVALID; bool includeSequenceDefaults = false; + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) float4 shardFillLevel = 0.0; char partitionMethod = 0; - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid relationId = shardInterval->relationId; - bool cstoreTable = CStoreTable(relationId); + ShardInterval *shardInterval = NULL; + Oid relationId = InvalidOid; + bool cstoreTable = false; - char storageType = shardInterval->storageType; + char storageType = 0; + + CheckCitusVersion(ERROR); + + shardInterval = LoadShardInterval(shardId); + relationId = shardInterval->relationId; + cstoreTable = CStoreTable(relationId); + storageType = shardInterval->storageType; EnsureTablePermissions(relationId, ACL_INSERT); @@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS) int64 shardId = PG_GETARG_INT64(0); uint64 shardSize = 0; + CheckCitusVersion(ERROR); + shardSize = UpdateShardStatistics(shardId); PG_RETURN_INT64(shardSize); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index af1aa5257..e7a659c2b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); PreventTransactionChain(true, "start_metadata_sync_to_node"); @@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); workerNode = FindWorkerNode(nodeNameString, nodePort); if (workerNode == NULL) diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index eed0e881f..bc61da45e 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -28,6 +28,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "distributed/metadata_cache.h" #include "distributed/relay_utility.h" #include "lib/stringinfo.h" #include "mb/pg_wchar.h" @@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS) errmsg("shard_id cannot be null"))); } - relationId = PG_GETARG_OID(0); shardId = PG_GETARG_INT64(1); + CheckCitusVersion(ERROR); + if (shardId <= 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 70e16bd1d..edff64850 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) { int recoveredTransactionCount = 0; + CheckCitusVersion(ERROR); + recoveredTransactionCount = RecoverPreparedTransactions(); PG_RETURN_INT32(recoveredTransactionCount); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7dcade9cd..a7357f20d 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 0238b6fdd..e2a925a04 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -17,6 +17,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "distributed/distribution_column.h" +#include "distributed/metadata_cache.h" #include "nodes/makefuncs.h" #include "nodes/nodes.h" #include "nodes/primnodes.h" @@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS) char *columnNodeString = NULL; text *columnNodeText = NULL; + CheckCitusVersion(ERROR); + relation = relation_open(relationId, AccessShareLock); column = BuildDistributionKeyFromColumnName(relation, columnName); @@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS) char *columnName = NULL; text *columnText = NULL; + CheckCitusVersion(ERROR); + columnName = ColumnNameToColumn(relationId, columnNodeString); columnText = cstring_to_text(columnName); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index d97ce3625..128461a6a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1754,6 +1754,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1815,6 +1817,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1876,6 +1880,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1932,6 +1938,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -1955,6 +1963,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 82916b85f..ecac82267 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -95,9 +95,12 @@ master_add_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); /* * After adding new node, if the node did not already exist, we will activate @@ -129,9 +132,12 @@ master_add_inactive_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); PG_RETURN_CSTRING(nodeRecord); } @@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS) int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); + CheckCitusVersion(ERROR); + RemoveNodeFromCluster(nodeNameString, nodePort); PG_RETURN_VOID(); @@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS) bool hasShardPlacements = false; bool isActive = false; + CheckCitusVersion(ERROR); + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); @@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); Datum nodeRecord = 0; + CheckCitusVersion(ERROR); + nodeRecord = ActivateNode(nodeNameString, nodePort); PG_RETURN_CSTRING(nodeRecord); @@ -263,9 +275,12 @@ Datum master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; - List *workerNodes = ParseWorkerNodeFileAndRename(); + List *workerNodes = NULL; bool nodeAlreadyExists = false; + CheckCitusVersion(ERROR); + + workerNodes = ParseWorkerNodeFileAndRename(); foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); @@ -273,7 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, workerNode->workerRack, false, workerNode->isActive, &nodeAlreadyExists); - } PG_RETURN_BOOL(true); @@ -292,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) char distributionMethod = 0; Oid relationId = InvalidOid; + CheckCitusVersion(ERROR); + /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore * we need to check all parameters for NULL values. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 2b09f7878..ecda24eb1 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) DistTableCacheEntry *tableEntry = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (!IsDistributedTable(relationId)) { diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 766bc166a..15daf3d53 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); @@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index aea2cdbd5..25634f953 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) char *taskCallString = text_to_cstring(taskCallStringText); uint32 taskCallStringLength = strlen(taskCallString); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + /* check that we have a running task tracker on this host */ - bool taskTrackerRunning = TaskTrackerRunning(); + taskTrackerRunning = TaskTrackerRunning(); if (!taskTrackerRunning) { ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), @@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS) WorkerTask *workerTask = NULL; uint32 taskStatus = 0; - bool taskTrackerRunning = TaskTrackerRunning(); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + + taskTrackerRunning = TaskTrackerRunning(); + if (taskTrackerRunning) { LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); @@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) StringInfo jobDirectoryName = NULL; StringInfo jobSchemaName = NULL; + CheckCitusVersion(ERROR); + /* * We first clean up any open connections, and remove tasks belonging to * this job from the shared hash. diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 3fe98fdf4..4e1045f57 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -114,6 +114,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -155,6 +158,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -415,6 +421,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, @@ -443,6 +451,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, @@ -472,6 +482,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) Oid sequenceRelationId = InvalidOid; NodeTag nodeType = nodeTag(commandNode); + + CheckCitusVersion(ERROR); + if (nodeType != T_CreateSeqStmt) { ereport(ERROR, @@ -513,6 +526,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -537,6 +552,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -1211,6 +1228,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) bool received = false; StringInfo queryString = NULL; + CheckCitusVersion(ERROR); + /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index d802f643f..e55da62de 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; Relation distributedRelation = NULL; - List *shardList = LoadShardList(relationId); + List *shardList = NULL; ListCell *shardCell = NULL; char relationKind = '\0'; + CheckCitusVersion(ERROR); EnsureSuperUser(); + shardList = LoadShardList(relationId); + /* first check the relation type */ distributedRelation = relation_open(relationId, AccessShareLock); relationKind = distributedRelation->rd_rel->relkind; diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index 546eaddea..9ec8882b2 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) ForeignTable *foreignTable = GetForeignTable(relationId); ListCell *optionCell = NULL; + + CheckCitusVersion(ERROR); + foreach(optionCell, foreignTable->options) { DefElem *option = (DefElem *) lfirst(optionCell); @@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS) (void) blockId; (void) dataDirectoryObject; + CheckCitusVersion(ERROR); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("called function is currently unsupported"))); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 6754f4acd..f1714e709 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) /* we should have the same number of column names and types */ int32 columnNameCount = ArrayObjectCount(columnNameObject); int32 columnTypeCount = ArrayObjectCount(columnTypeObject); + + CheckCitusVersion(ERROR); + if (columnNameCount != columnTypeCount) { ereport(ERROR, (errmsg("column name array size: %d and type array size: %d" @@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) int createIntermediateTableResult = 0; int finished = 0; + CheckCitusVersion(ERROR); + /* * If the schema for the job isn't already created by the task tracker * protocol, we fall to using the default 'public' schema. @@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS) int scanKeyCount = 0; HeapTuple heapTuple = NULL; + CheckCitusVersion(ERROR); + pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock); scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d7731f426..b15915f78 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -112,6 +112,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* first check that array element's and partition column's types match */ Oid splitPointType = ARR_ELEMTYPE(splitPointObject); + + CheckCitusVersion(ERROR); + if (splitPointType != partitionColumnType) { ereport(ERROR, (errmsg("partition column type %u and split point type %u " @@ -184,6 +187,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) FileOutputStream *partitionFileArray = NULL; uint32 fileCount = partitionCount; + CheckCitusVersion(ERROR); + /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC); diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c index 237768e70..099c38fb8 100644 --- a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); EnsureSuperUser(); + CheckCitusVersion(ERROR); /* Create the truncate trigger */ CreateTruncateTrigger(relationId); From 7a7c74cc879b5fe0c209546d72b5c8e12f2bab72 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 19 May 2017 19:12:19 +0300 Subject: [PATCH 4/4] Add tests for version checks --- src/test/regress/expected/multi_extension.out | 61 +++++++++++++++++++ src/test/regress/sql/multi_extension.sql | 53 ++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 4236ed162..9715eb3b5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -109,6 +109,67 @@ CREATE EXTENSION citus VERSION '5.0'; ERROR: specified version incompatible with loaded Citus library DETAIL: Loaded library requires 7.0, but 5.0 was specified. HINT: If a newer library is present, restart the database and try the command again. +-- Test non-distributed queries work even in version mismatch +SET citus.enable_version_checks TO 'false'; +CREATE EXTENSION citus VERSION '6.2-1'; +SET citus.enable_version_checks TO 'true'; +-- Test CREATE TABLE +CREATE TABLE version_mismatch_table(column1 int); +-- Test COPY +\copy version_mismatch_table FROM STDIN; +-- Test INSERT +INSERT INTO version_mismatch_table(column1) VALUES(5); + +-- Test SELECT +SELECT * FROM version_mismatch_table ORDER BY column1; + column1 +--------- + 0 + 1 + 2 + 3 + 4 + 5 +(6 rows) + +-- Test SELECT from pg_catalog +SELECT d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges" +FROM pg_catalog.pg_database d +ORDER BY 1; + Name | Owner | Access privileges +------------+----------+----------------------- + postgres | postgres | + regression | postgres | + template0 | postgres | =c/postgres + + | | postgres=CTc/postgres + template1 | postgres | =c/postgres + + | | postgres=CTc/postgres +(4 rows) + +-- We should not distribute table in version mistmatch +SELECT create_distributed_table('version_mismatch_table', 'column1'); +ERROR: loaded Citus library version differs from installed extension version +DETAIL: Loaded library requires 7.0, but the installed extension version is 6.2-1. +HINT: Run ALTER EXTENSION citus UPDATE and try again. +-- This function will cause fail in next ALTER EXTENSION +CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass) +RETURNS bigint LANGUAGE plpgsql +AS $function$ +BEGIN +END; +$function$; +SET citus.enable_version_checks TO 'false'; +-- This will fail because of previous function declaration +ALTER EXTENSION citus UPDATE TO '6.2-2'; +ERROR: function "citus_table_size" already exists with same argument types +-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on +SET citus.enable_version_checks TO 'true'; +DROP FUNCTION citus_table_size(regclass); +SET citus.enable_version_checks TO 'false'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; -- re-create in newest version +DROP EXTENSION citus; \c CREATE EXTENSION citus; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index db625ecd6..327644f9c 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -101,6 +101,59 @@ RESET citus.enable_version_checks; DROP EXTENSION citus; CREATE EXTENSION citus VERSION '5.0'; +-- Test non-distributed queries work even in version mismatch +SET citus.enable_version_checks TO 'false'; +CREATE EXTENSION citus VERSION '6.2-1'; +SET citus.enable_version_checks TO 'true'; + +-- Test CREATE TABLE +CREATE TABLE version_mismatch_table(column1 int); + +-- Test COPY +\copy version_mismatch_table FROM STDIN; +0 +1 +2 +3 +4 +\. + +-- Test INSERT +INSERT INTO version_mismatch_table(column1) VALUES(5); + +-- Test SELECT +SELECT * FROM version_mismatch_table ORDER BY column1; + +-- Test SELECT from pg_catalog +SELECT d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges" +FROM pg_catalog.pg_database d +ORDER BY 1; + +-- We should not distribute table in version mistmatch +SELECT create_distributed_table('version_mismatch_table', 'column1'); + +-- This function will cause fail in next ALTER EXTENSION +CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass) +RETURNS bigint LANGUAGE plpgsql +AS $function$ +BEGIN +END; +$function$; + +SET citus.enable_version_checks TO 'false'; +-- This will fail because of previous function declaration +ALTER EXTENSION citus UPDATE TO '6.2-2'; + +-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on +SET citus.enable_version_checks TO 'true'; +DROP FUNCTION citus_table_size(regclass); + +SET citus.enable_version_checks TO 'false'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; + -- re-create in newest version +DROP EXTENSION citus; \c CREATE EXTENSION citus;