From 9fb15c439c1ce6816a8e7f247f4dde26d8102783 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 19 May 2017 19:11:57 +0300 Subject: [PATCH] Add version checks to necessary UDFs --- .../commands/create_distributed_table.c | 10 +++++-- .../commands/drop_distributed_table.c | 1 + .../distributed/master/master_citus_tools.c | 2 ++ .../distributed/master/master_create_shards.c | 1 + .../master/master_delete_protocol.c | 4 +++ .../master/master_expire_table_cache.c | 15 +++++++--- .../master/master_metadata_utility.c | 6 ++++ .../master/master_modify_multiple_shards.c | 2 ++ .../distributed/master/master_node_protocol.c | 8 ++++++ .../distributed/master/master_repair_shards.c | 1 + .../distributed/master/master_split_shards.c | 2 ++ .../master/master_stage_protocol.c | 23 +++++++++++---- .../distributed/metadata/metadata_sync.c | 2 ++ .../distributed/relay/relay_event_utility.c | 4 ++- .../transaction/transaction_recovery.c | 2 ++ .../distributed/utils/colocation_utils.c | 1 + .../distributed/utils/distribution_column.c | 5 ++++ .../distributed/utils/metadata_cache.c | 10 +++++++ src/backend/distributed/utils/node_metadata.c | 28 +++++++++++++++---- .../distributed/utils/reference_table_utils.c | 1 + src/backend/distributed/utils/resource_lock.c | 4 +++ .../worker/task_tracker_protocol.c | 15 ++++++++-- .../worker/worker_data_fetch_protocol.c | 19 +++++++++++++ .../distributed/worker/worker_drop_protocol.c | 5 +++- .../worker/worker_file_access_protocol.c | 5 ++++ .../worker/worker_merge_protocol.c | 7 +++++ .../worker/worker_partition_protocol.c | 5 ++++ .../worker/worker_truncate_trigger_protocol.c | 1 + 28 files changed, 168 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 825bf84d4..103fedc4f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) bool requireEmpty = true; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { @@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) @@ -233,13 +235,17 @@ static void CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = ActiveWorkerNodeList(); - int replicationFactor = list_length(workerNodeList); + List *workerNodeList = NIL; + int replicationFactor = 0; char *distributionColumnName = NULL; bool requireEmpty = true; char relationKind = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + replicationFactor = list_length(workerNodeList); /* if there are no workers, error out */ if (replicationFactor == 0) diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 76b29f1c8..734d478d3 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) char *tableName = text_to_cstring(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index b46b60739..946ec16cf 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS) int commandIndex = 0; int commandCount = 0; + CheckCitusVersion(ERROR); + /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) { diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 56ec7e788..a56f283bf 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) Oid distributedTableId = ResolveRelationId(tableNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index ed8418aa4..8f5943696 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) bool failOK = false; EnsureCoordinator(); + CheckCitusVersion(ERROR); queryTreeNode = ParseTreeNode(queryString); if (!IsA(queryTreeNode, DeleteStmt)) @@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); EnsureCoordinator(); + CheckCitusVersion(ERROR); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); @@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS) StringInfo dropSeqCommand = makeStringInfo(); bool coordinator = IsCoordinator(); + CheckCitusVersion(ERROR); + /* do nothing if DDL propagation is switched off or this is not the coordinator */ if (!EnableDDLPropagation || !coordinator) { diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 70eef82aa..8d4f38b03 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -46,14 +46,21 @@ Datum master_expire_table_cache(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); - List *workerNodeList = ActiveWorkerNodeList(); + DistTableCacheEntry *cacheEntry = NULL; + List *workerNodeList = NIL; ListCell *workerNodeCell = NULL; - int shardCount = cacheEntry->shardIntervalArrayLength; - ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + int shardCount = 0; + ShardInterval **shardIntervalArray = NULL; List **placementListArray = NULL; int shardIndex = 0; + CheckCitusVersion(ERROR); + + cacheEntry = DistributedTableCacheEntry(relationId); + workerNodeList = ActiveWorkerNodeList(); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalArray = cacheEntry->sortedShardIntervalArray; + if (shardCount == 0) { ereport(WARNING, (errmsg("Table has no shards, no action is taken"))); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index e6bfa574b..4081ad35b 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 totalRelationSize = 0; + CheckCitusVersion(ERROR); + totalRelationSize = DistributedTableSize(relationId, PG_TOTAL_RELATION_SIZE_FUNCTION); @@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 tableSize = 0; + CheckCitusVersion(ERROR); + tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION); PG_RETURN_INT64(tableSize); @@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); uint64 relationSize = 0; + CheckCitusVersion(ERROR); + relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION); PG_RETURN_INT64(relationSize); diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 086846fde..f7838b08c 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) int32 affectedTupleCount = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); + queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index db83b2665..d7214cf04 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS) Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; + CheckCitusVersion(ERROR); + /* find partition tuple for partitioned relation */ partitionEntry = DistributedTableCacheEntry(relationId); @@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) FuncCallContext *functionContext = NULL; ListCell *tableDDLEventCell = NULL; + CheckCitusVersion(ERROR); + /* * On the very first call to this function, we first use the given relation * name to get to the relation. We then recreate the list of DDL statements @@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS) Datum shardIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); shardId = GetNextShardId(); shardIdDatum = Int64GetDatum(shardId); @@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS) Datum placementIdDatum = 0; EnsureCoordinator(); + CheckCitusVersion(ERROR); placementId = GetNextPlacementId(); placementIdDatum = Int64GetDatum(placementId); @@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) uint32 workerNodeIndex = 0; uint32 workerNodeCount = 0; + CheckCitusVersion(ERROR); + if (SRF_IS_FIRSTCALL()) { MemoryContext oldContext = NULL; diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 2471da365..1774895f8 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); /* RepairShardPlacement function repairs only given shard */ RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, diff --git a/src/backend/distributed/master/master_split_shards.c b/src/backend/distributed/master/master_split_shards.c index cc18a3c3e..a1e928e5d 100644 --- a/src/backend/distributed/master/master_split_shards.c +++ b/src/backend/distributed/master/master_split_shards.c @@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS) FmgrInfo *hashFunction = NULL; Oid valueDataType = InvalidOid; + CheckCitusVersion(ERROR); + /* figure out hash function from the data type */ valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f24bfc07c..cf492bc02 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); - List *workerNodeList = ActiveWorkerNodeList(); + List *workerNodeList = NIL; uint64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; @@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char replicationModel = REPLICATION_MODEL_INVALID; bool includeSequenceDefaults = false; + CheckCitusVersion(ERROR); + + workerNodeList = ActiveWorkerNodeList(); + EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) float4 shardFillLevel = 0.0; char partitionMethod = 0; - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid relationId = shardInterval->relationId; - bool cstoreTable = CStoreTable(relationId); + ShardInterval *shardInterval = NULL; + Oid relationId = InvalidOid; + bool cstoreTable = false; - char storageType = shardInterval->storageType; + char storageType = 0; + + CheckCitusVersion(ERROR); + + shardInterval = LoadShardInterval(shardId); + relationId = shardInterval->relationId; + cstoreTable = CStoreTable(relationId); + storageType = shardInterval->storageType; EnsureTablePermissions(relationId, ACL_INSERT); @@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS) int64 shardId = PG_GETARG_INT64(0); uint64 shardSize = 0; + CheckCitusVersion(ERROR); + shardSize = UpdateShardStatistics(shardId); PG_RETURN_INT64(shardSize); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index af1aa5257..e7a659c2b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); PreventTransactionChain(true, "start_metadata_sync_to_node"); @@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + CheckCitusVersion(ERROR); workerNode = FindWorkerNode(nodeNameString, nodePort); if (workerNode == NULL) diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index eed0e881f..bc61da45e 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -28,6 +28,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "distributed/metadata_cache.h" #include "distributed/relay_utility.h" #include "lib/stringinfo.h" #include "mb/pg_wchar.h" @@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS) errmsg("shard_id cannot be null"))); } - relationId = PG_GETARG_OID(0); shardId = PG_GETARG_INT64(1); + CheckCitusVersion(ERROR); + if (shardId <= 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 70e16bd1d..edff64850 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) { int recoveredTransactionCount = 0; + CheckCitusVersion(ERROR); + recoveredTransactionCount = RecoverPreparedTransactions(); PG_RETURN_INT32(recoveredTransactionCount); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7dcade9cd..a7357f20d 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS) } EnsureCoordinator(); + CheckCitusVersion(ERROR); relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 0238b6fdd..e2a925a04 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -17,6 +17,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "distributed/distribution_column.h" +#include "distributed/metadata_cache.h" #include "nodes/makefuncs.h" #include "nodes/nodes.h" #include "nodes/primnodes.h" @@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS) char *columnNodeString = NULL; text *columnNodeText = NULL; + CheckCitusVersion(ERROR); + relation = relation_open(relationId, AccessShareLock); column = BuildDistributionKeyFromColumnName(relation, columnName); @@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS) char *columnName = NULL; text *columnText = NULL; + CheckCitusVersion(ERROR); + columnName = ColumnNameToColumn(relationId, columnNodeString); columnText = cstring_to_text(columnName); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index d97ce3625..128461a6a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1754,6 +1754,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1815,6 +1817,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1876,6 +1880,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + newTuple = triggerData->tg_newtuple; oldTuple = triggerData->tg_trigtuple; @@ -1932,6 +1938,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -1955,6 +1963,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } + CheckCitusVersion(ERROR); + CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 82916b85f..ecac82267 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -95,9 +95,12 @@ master_add_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); /* * After adding new node, if the node did not already exist, we will activate @@ -129,9 +132,12 @@ master_add_inactive_node(PG_FUNCTION_ARGS) bool hasMetadata = false; bool isActive = false; bool nodeAlreadyExists = false; + Datum nodeRecord; - Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, isActive, &nodeAlreadyExists); + CheckCitusVersion(ERROR); + + nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); PG_RETURN_CSTRING(nodeRecord); } @@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS) int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); + CheckCitusVersion(ERROR); + RemoveNodeFromCluster(nodeNameString, nodePort); PG_RETURN_VOID(); @@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS) bool hasShardPlacements = false; bool isActive = false; + CheckCitusVersion(ERROR); + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); @@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); Datum nodeRecord = 0; + CheckCitusVersion(ERROR); + nodeRecord = ActivateNode(nodeNameString, nodePort); PG_RETURN_CSTRING(nodeRecord); @@ -263,9 +275,12 @@ Datum master_initialize_node_metadata(PG_FUNCTION_ARGS) { ListCell *workerNodeCell = NULL; - List *workerNodes = ParseWorkerNodeFileAndRename(); + List *workerNodes = NULL; bool nodeAlreadyExists = false; + CheckCitusVersion(ERROR); + + workerNodes = ParseWorkerNodeFileAndRename(); foreach(workerNodeCell, workerNodes) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); @@ -273,7 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, workerNode->workerRack, false, workerNode->isActive, &nodeAlreadyExists); - } PG_RETURN_BOOL(true); @@ -292,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) char distributionMethod = 0; Oid relationId = InvalidOid; + CheckCitusVersion(ERROR); + /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore * we need to check all parameters for NULL values. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 2b09f7878..ecda24eb1 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) DistTableCacheEntry *tableEntry = NULL; EnsureCoordinator(); + CheckCitusVersion(ERROR); if (!IsDistributedTable(relationId)) { diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 766bc166a..15daf3d53 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); @@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS) int shardIdCount = 0; int shardIdIndex = 0; + CheckCitusVersion(ERROR); + if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index aea2cdbd5..25634f953 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) char *taskCallString = text_to_cstring(taskCallStringText); uint32 taskCallStringLength = strlen(taskCallString); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + /* check that we have a running task tracker on this host */ - bool taskTrackerRunning = TaskTrackerRunning(); + taskTrackerRunning = TaskTrackerRunning(); if (!taskTrackerRunning) { ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), @@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS) WorkerTask *workerTask = NULL; uint32 taskStatus = 0; - bool taskTrackerRunning = TaskTrackerRunning(); + bool taskTrackerRunning = false; + + CheckCitusVersion(ERROR); + + taskTrackerRunning = TaskTrackerRunning(); + if (taskTrackerRunning) { LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); @@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) StringInfo jobDirectoryName = NULL; StringInfo jobSchemaName = NULL; + CheckCitusVersion(ERROR); + /* * We first clean up any open connections, and remove tasks belonging to * this job from the shared hash. diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 3fe98fdf4..4e1045f57 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -114,6 +114,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -155,6 +158,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS) * task directory does not exist. We then lock and create the directory. */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); + + CheckCitusVersion(ERROR); + if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -415,6 +421,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, @@ -443,6 +451,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); + CheckCitusVersion(ERROR); + /* extend names in ddl command and apply extended command */ RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, @@ -472,6 +482,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) Oid sequenceRelationId = InvalidOid; NodeTag nodeType = nodeTag(commandNode); + + CheckCitusVersion(ERROR); + if (nodeType != T_CreateSeqStmt) { ereport(ERROR, @@ -513,6 +526,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -537,6 +552,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS) ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + CheckCitusVersion(ERROR); + /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. @@ -1211,6 +1228,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) bool received = false; StringInfo queryString = NULL; + CheckCitusVersion(ERROR); + /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index d802f643f..e55da62de 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; Relation distributedRelation = NULL; - List *shardList = LoadShardList(relationId); + List *shardList = NULL; ListCell *shardCell = NULL; char relationKind = '\0'; + CheckCitusVersion(ERROR); EnsureSuperUser(); + shardList = LoadShardList(relationId); + /* first check the relation type */ distributedRelation = relation_open(relationId, AccessShareLock); relationKind = distributedRelation->rd_rel->relkind; diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index 546eaddea..9ec8882b2 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) ForeignTable *foreignTable = GetForeignTable(relationId); ListCell *optionCell = NULL; + + CheckCitusVersion(ERROR); + foreach(optionCell, foreignTable->options) { DefElem *option = (DefElem *) lfirst(optionCell); @@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS) (void) blockId; (void) dataDirectoryObject; + CheckCitusVersion(ERROR); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("called function is currently unsupported"))); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 6754f4acd..f1714e709 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) /* we should have the same number of column names and types */ int32 columnNameCount = ArrayObjectCount(columnNameObject); int32 columnTypeCount = ArrayObjectCount(columnTypeObject); + + CheckCitusVersion(ERROR); + if (columnNameCount != columnTypeCount) { ereport(ERROR, (errmsg("column name array size: %d and type array size: %d" @@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) int createIntermediateTableResult = 0; int finished = 0; + CheckCitusVersion(ERROR); + /* * If the schema for the job isn't already created by the task tracker * protocol, we fall to using the default 'public' schema. @@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS) int scanKeyCount = 0; HeapTuple heapTuple = NULL; + CheckCitusVersion(ERROR); + pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock); scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d7731f426..b15915f78 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -112,6 +112,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* first check that array element's and partition column's types match */ Oid splitPointType = ARR_ELEMTYPE(splitPointObject); + + CheckCitusVersion(ERROR); + if (splitPointType != partitionColumnType) { ereport(ERROR, (errmsg("partition column type %u and split point type %u " @@ -184,6 +187,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) FileOutputStream *partitionFileArray = NULL; uint32 fileCount = partitionCount; + CheckCitusVersion(ERROR); + /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC); diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c index 237768e70..099c38fb8 100644 --- a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); EnsureSuperUser(); + CheckCitusVersion(ERROR); /* Create the truncate trigger */ CreateTruncateTrigger(relationId);