diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 84e47eac7..17b4264ba 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -219,11 +219,11 @@ PG_FUNCTION_INFO_V1(worker_change_sequence_dependency); Datum undistribute_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool cascadeViaForeignKeys = PG_GETARG_BOOL(1); - CheckCitusVersion(ERROR); - TableConversionParameters params = { .relationId = relationId, .cascadeViaForeignKeys = cascadeViaForeignKeys @@ -243,6 +243,8 @@ undistribute_table(PG_FUNCTION_ARGS) Datum alter_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); char *distributionColumn = NULL; @@ -280,9 +282,6 @@ alter_distributed_table(PG_FUNCTION_ARGS) } } - CheckCitusVersion(ERROR); - - TableConversionParameters params = { .relationId = relationId, .distributionColumn = distributionColumn, @@ -305,13 +304,13 @@ alter_distributed_table(PG_FUNCTION_ARGS) Datum alter_table_set_access_method(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *accessMethodText = PG_GETARG_TEXT_P(1); char *accessMethod = text_to_cstring(accessMethodText); - CheckCitusVersion(ERROR); - TableConversionParameters params = { .relationId = relationId, .accessMethod = accessMethod diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 2f03ccb17..1120a5b92 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -142,12 +142,11 @@ PG_FUNCTION_INFO_V1(create_reference_table); Datum master_create_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); char *colocateWithTableName = NULL; @@ -189,6 +188,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3)) { PG_RETURN_VOID(); @@ -221,8 +222,6 @@ create_distributed_table(PG_FUNCTION_ARGS) shardCountIsStrict = true; } - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); /* enable create_distributed_table on an empty node */ @@ -271,6 +270,7 @@ create_distributed_table(PG_FUNCTION_ARGS) Datum create_reference_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); char *colocateWithTableName = NULL; @@ -278,8 +278,6 @@ create_reference_table(PG_FUNCTION_ARGS) bool viaDeprecatedAPI = false; - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); /* enable create_reference_table on an empty node */ diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 536e27206..79adf02a9 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -59,6 +59,8 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) Datum master_remove_partition_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *tableNameText = PG_GETARG_TEXT_P(2); @@ -66,8 +68,6 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *tableName = text_to_cstring(tableNameText); - CheckCitusVersion(ERROR); - /* * The SQL_DROP trigger calls this function even for tables that are * not distributed. In that case, silently ignore. This is not very @@ -97,6 +97,8 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS) Datum master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *tableNameText = PG_GETARG_TEXT_P(2); @@ -104,8 +106,6 @@ master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *tableName = text_to_cstring(tableNameText); - CheckCitusVersion(ERROR); - CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); MasterRemoveDistributedTableMetadataFromWorkers(relationId, schemaName, tableName); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index 03a2c98f3..185f788e5 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -174,10 +174,11 @@ TruncateTaskList(Oid relationId) Datum truncate_local_data_after_distributing_table(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); EnsureCoordinator(); + + Oid relationId = PG_GETARG_OID(0); + EnsureLocalTableCanBeTruncated(relationId); TruncateStmt *truncateStmt = makeNode(TruncateStmt); diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index 19bc93ae6..efe14b2ad 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -111,9 +111,9 @@ PG_FUNCTION_INFO_V1(citus_reserved_connection_stats); Datum citus_reserved_connection_stats(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllReservedConnections(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 649f2e9b5..89fb1cd19 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -136,9 +136,9 @@ PG_FUNCTION_INFO_V1(citus_remote_connection_stats); Datum citus_remote_connection_stats(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c45e64821..c1b8f86dc 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -118,6 +118,8 @@ PG_FUNCTION_INFO_V1(fetch_intermediate_results); Datum broadcast_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *resultIdText = PG_GETARG_TEXT_P(0); char *resultIdString = text_to_cstring(resultIdText); text *queryText = PG_GETARG_TEXT_P(1); @@ -125,8 +127,6 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) bool writeLocalFile = false; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - /* * Make sure that this transaction has a distributed transaction ID. * @@ -159,6 +159,8 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) Datum create_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *resultIdText = PG_GETARG_TEXT_P(0); char *resultIdString = text_to_cstring(resultIdText); text *queryText = PG_GETARG_TEXT_P(1); @@ -167,8 +169,6 @@ create_intermediate_result(PG_FUNCTION_ARGS) bool writeLocalFile = true; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - /* * Make sure that this transaction has a distributed transaction ID. * @@ -771,13 +771,13 @@ IntermediateResultSize(const char *resultId) Datum read_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Datum resultId = PG_GETARG_DATUM(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); - CheckCitusVersion(ERROR); - ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1); PG_RETURN_DATUM(0); @@ -794,14 +794,14 @@ read_intermediate_result(PG_FUNCTION_ARGS) Datum read_intermediate_result_array(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); - CheckCitusVersion(ERROR); - int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS( resultIdObject)); Datum *resultIdArray = DeconstructArrayObject(resultIdObject); @@ -874,6 +874,8 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, Datum fetch_intermediate_results(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); Datum *resultIdArray = DeconstructArrayObject(resultIdObject); int32 resultCount = ArrayObjectCount(resultIdObject); @@ -885,8 +887,6 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) int resultIndex = 0; int64 totalBytesWritten = 0L; - CheckCitusVersion(ERROR); - if (resultCount == 0) { PG_RETURN_INT64(0); diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 3b481a231..c0f6e9d65 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -107,6 +107,8 @@ PG_FUNCTION_INFO_V1(worker_partition_query_result); Datum worker_partition_query_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; text *resultIdPrefixText = PG_GETARG_TEXT_P(0); @@ -136,8 +138,6 @@ worker_partition_query_result(PG_FUNCTION_ARGS) bool binaryCopy = PG_GETARG_BOOL(6); - CheckCitusVersion(ERROR); - if (!IsMultiStatementTransaction()) { ereport(ERROR, (errmsg("worker_partition_query_result can only be used in a " diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index aa5a61886..144334852 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -2641,6 +2641,8 @@ SecondaryNodeRoleId(void) Datum citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldLogicalRelationId = InvalidOid; Oid newLogicalRelationId = InvalidOid; @@ -2651,8 +2653,6 @@ citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - if (RelationGetRelid(triggerData->tg_relation) != DistPartitionRelationId()) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), @@ -2718,6 +2718,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldLogicalRelationId = InvalidOid; Oid newLogicalRelationId = InvalidOid; @@ -2728,8 +2730,6 @@ citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - if (RelationGetRelid(triggerData->tg_relation) != DistShardRelationId()) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), @@ -2795,6 +2795,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldShardId = InvalidOid; Oid newShardId = InvalidOid; @@ -2805,8 +2807,6 @@ citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - /* * Before 7.0-2 this trigger is on pg_dist_shard_placement, * ignore trigger in this scenario. @@ -2884,14 +2884,14 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_node_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistNodeRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2919,14 +2919,14 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_conninfo_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - /* no-op in community edition */ PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2954,14 +2954,14 @@ master_dist_authinfo_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2989,14 +2989,14 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_object_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistObjectRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 686a206b4..48176495d 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -103,6 +103,8 @@ static bool got_SIGALRM = false; Datum start_metadata_sync_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); @@ -126,10 +128,10 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) /* fail if metadata synchronization doesn't succeed */ bool raiseInterrupts = true; + CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); EnsureModificationsCanRun(); - CheckCitusVersion(ERROR); PreventInTransactionBlock(true, "start_metadata_sync_to_node"); @@ -185,14 +187,14 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) Datum stop_metadata_sync_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + EnsureSuperUser(); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - EnsureCoordinator(); - EnsureSuperUser(); - CheckCitusVersion(ERROR); - LockRelationOid(DistNodeRelationId(), ExclusiveLock); WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index a5bd46e64..e0c04317b 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -270,13 +270,13 @@ citus_shard_sizes(PG_FUNCTION_ARGS) Datum citus_total_relation_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = PG_GETARG_BOOL(1); SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; @@ -301,12 +301,12 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Datum citus_table_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = true; SizeQueryType sizeQueryType = TABLE_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; @@ -331,12 +331,12 @@ citus_table_size(PG_FUNCTION_ARGS) Datum citus_relation_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = true; SizeQueryType sizeQueryType = RELATION_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 0b4f03f93..f2305f644 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -161,6 +161,8 @@ DefaultNodeMetadata() Datum citus_set_coordinator_host(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -173,8 +175,6 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) Name nodeClusterName = PG_GETARG_NAME(3); nodeMetadata.nodeCluster = NameStr(*nodeClusterName); - CheckCitusVersion(ERROR); - /* prevent concurrent modification */ LockRelationOid(DistNodeRelationId(), RowShareLock); @@ -219,6 +219,8 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) Datum citus_add_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -227,8 +229,6 @@ citus_add_node(PG_FUNCTION_ARGS) bool nodeAlreadyExists = false; nodeMetadata.groupId = PG_GETARG_INT32(2); - CheckCitusVersion(ERROR); - /* * During tests this function is called before nodeRole and nodeCluster have been * created. @@ -288,6 +288,8 @@ master_add_node(PG_FUNCTION_ARGS) Datum citus_add_inactive_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -299,8 +301,6 @@ citus_add_inactive_node(PG_FUNCTION_ARGS) nodeMetadata.nodeRole = PG_GETARG_OID(3); nodeMetadata.nodeCluster = NameStr(*nodeClusterName); - CheckCitusVersion(ERROR); - if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) { ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node"))); @@ -331,6 +331,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS) Datum citus_add_secondary_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -348,8 +350,6 @@ citus_add_secondary_node(PG_FUNCTION_ARGS) nodeMetadata.nodeRole = SecondaryNodeRoleId(); nodeMetadata.isActive = true; - CheckCitusVersion(ERROR); - int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); TransactionModifiedNodeMetadata = true; @@ -380,11 +380,11 @@ master_add_secondary_node(PG_FUNCTION_ARGS) Datum citus_remove_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - CheckCitusVersion(ERROR); - RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort); TransactionModifiedNodeMetadata = true; @@ -631,7 +631,6 @@ static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort) { CheckCitusVersion(ERROR); - EnsureCoordinator(); /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ @@ -843,6 +842,8 @@ ActivateNode(char *nodeName, int nodePort) Datum citus_update_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + int32 nodeId = PG_GETARG_INT32(0); text *newNodeName = PG_GETARG_TEXT_P(1); @@ -864,8 +865,6 @@ citus_update_node(PG_FUNCTION_ARGS) List *placementList = NIL; BackgroundWorkerHandle *handle = NULL; - CheckCitusVersion(ERROR); - WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -1077,10 +1076,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort) Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) { - ShardInterval *shardInterval = NULL; - CheckCitusVersion(ERROR); + ShardInterval *shardInterval = NULL; + /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore * we need to check all parameters for NULL values. diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index 9d9019a83..42fc5311f 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(citus_create_restore_point); Datum citus_create_restore_point(PG_FUNCTION_ARGS) { - text *restoreNameText = PG_GETARG_TEXT_P(0); - CheckCitusVersion(ERROR); EnsureSuperUser(); EnsureCoordinator(); + text *restoreNameText = PG_GETARG_TEXT_P(0); + if (RecoveryInProgress()) { ereport(ERROR, diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index 4035cf5e1..9b2eedefb 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -71,14 +71,14 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, Datum master_run_on_worker(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; bool parallelExecution = false; StringInfo *nodeNameArray = NULL; int *nodePortArray = NULL; StringInfo *commandStringArray = NULL; - CheckCitusVersion(ERROR); - /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) { diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 74e0022a4..65cba434e 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -64,6 +64,9 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards); Datum master_create_worker_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + text *tableNameText = PG_GETARG_TEXT_P(0); int32 shardCount = PG_GETARG_INT32(1); int32 replicationFactor = PG_GETARG_INT32(2); @@ -74,9 +77,6 @@ master_create_worker_shards(PG_FUNCTION_ARGS) /* do not add any data */ bool useExclusiveConnections = false; - EnsureCoordinator(); - CheckCitusVersion(ERROR); - /* * distributed tables might have dependencies on different objects, since we create * shards for a distributed table via multiple sessions these objects will be created diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index a175d741a..7543beaa6 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -109,6 +109,9 @@ PG_FUNCTION_INFO_V1(master_drop_sequences); Datum master_apply_delete_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); List *deletableShardIntervalList = NIL; @@ -116,9 +119,6 @@ master_apply_delete_command(PG_FUNCTION_ARGS) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); Node *queryTreeNode = rawStmt->stmt; - EnsureCoordinator(); - CheckCitusVersion(ERROR); - if (!IsA(queryTreeNode, DeleteStmt)) { ereport(ERROR, (errmsg("query \"%s\" is not a delete statement", @@ -208,6 +208,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS) Datum citus_drop_all_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); @@ -215,8 +217,6 @@ citus_drop_all_shards(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *relationName = text_to_cstring(relationNameText); - CheckCitusVersion(ERROR); - /* * The SQL_DROP trigger calls this function even for tables that are * not distributed. In that case, silently ignore and return -1. diff --git a/src/backend/distributed/operations/modify_multiple_shards.c b/src/backend/distributed/operations/modify_multiple_shards.c index caac4cb64..cb740dabb 100644 --- a/src/backend/distributed/operations/modify_multiple_shards.c +++ b/src/backend/distributed/operations/modify_multiple_shards.c @@ -70,13 +70,13 @@ PG_FUNCTION_INFO_V1(master_modify_multiple_shards); Datum master_modify_multiple_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); Node *queryTreeNode = rawStmt->stmt; - CheckCitusVersion(ERROR); - if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt)) { ereport(ERROR, (errmsg("query \"%s\" is not a delete or update " diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index fc03a09d1..c52f6efcf 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -103,6 +103,8 @@ PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); Datum master_get_table_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *relationName = PG_GETARG_TEXT_P(0); Oid relationId = ResolveRelationId(relationName, false); @@ -112,8 +114,6 @@ master_get_table_metadata(PG_FUNCTION_ARGS) Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; - CheckCitusVersion(ERROR); - /* find partition tuple for partitioned relation */ CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId); @@ -201,11 +201,11 @@ CStoreTable(Oid relationId) Datum master_get_table_ddl_events(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *tableDDLEventCell = NULL; - CheckCitusVersion(ERROR); - /* * On the very first call to this function, we first use the given relation * name to get to the relation. We then recreate the list of DDL statements @@ -276,8 +276,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) Datum master_get_new_shardid(PG_FUNCTION_ARGS) { - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); uint64 shardId = GetNextShardId(); Datum shardIdDatum = Int64GetDatum(shardId); @@ -346,8 +346,8 @@ GetNextShardId() Datum master_get_new_placementid(PG_FUNCTION_ARGS) { - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); uint64 placementId = GetNextPlacementId(); Datum placementIdDatum = Int64GetDatum(placementId); @@ -453,11 +453,11 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS) Datum citus_get_active_worker_nodes(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; uint32 workerNodeCount = 0; - CheckCitusVersion(ERROR); - if (SRF_IS_FIRSTCALL()) { /* create a function context for cross-call persistence */ diff --git a/src/backend/distributed/operations/partitioning.c b/src/backend/distributed/operations/partitioning.c index c7d989d5a..9e2057927 100644 --- a/src/backend/distributed/operations/partitioning.c +++ b/src/backend/distributed/operations/partitioning.c @@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(time_partition_range); Datum time_partition_range(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); + /* create tuple descriptor for return value */ TupleDesc metadataDescriptor = NULL; TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL, diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index cfc7010bb..59e87f74a 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -122,6 +122,9 @@ bool CheckAvailableSpaceBeforeMove = true; Datum citus_copy_shard_placement(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + int64 shardId = PG_GETARG_INT64(0); text *sourceNodeNameText = PG_GETARG_TEXT_P(1); int32 sourceNodePort = PG_GETARG_INT32(2); @@ -133,9 +136,6 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) char *sourceNodeName = text_to_cstring(sourceNodeNameText); char *targetNodeName = text_to_cstring(targetNodeNameText); - EnsureCoordinator(); - CheckCitusVersion(ERROR); - char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) { @@ -283,6 +283,9 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes) Datum citus_move_shard_placement(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + int64 shardId = PG_GETARG_INT64(0); char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1)); int32 sourceNodePort = PG_GETARG_INT32(2); @@ -294,10 +297,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) ListCell *colocatedTableCell = NULL; ListCell *colocatedShardCell = NULL; - - CheckCitusVersion(ERROR); - EnsureCoordinator(); - Oid relationId = RelationIdForShard(shardId); ErrorIfMoveCitusLocalTable(relationId); ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 91c00d7c0..92f37ec05 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -547,6 +547,7 @@ GetShardCost(uint64 shardId, void *voidContext) Datum citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); uint64 shardId = PG_GETARG_INT64(0); bool missingOk = false; ShardPlacement *shardPlacement = ActiveShardPlacement(shardId, missingOk); @@ -784,6 +785,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) Datum rebalance_table_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *relationIdList = NIL; if (!PG_ARGISNULL(0)) { @@ -888,6 +890,7 @@ GetRebalanceStrategy(Name name) Datum citus_drain_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "nodename"); PG_ENSURE_ARGNOTNULL(1, "nodeport"); PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); @@ -932,6 +935,7 @@ citus_drain_node(PG_FUNCTION_ARGS) Datum replicate_table_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); uint32 shardReplicationFactor = PG_GETARG_INT32(1); int32 maxShardCopies = PG_GETARG_INT32(2); @@ -986,6 +990,7 @@ master_drain_node(PG_FUNCTION_ARGS) Datum get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *relationIdList = NIL; if (!PG_ARGISNULL(0)) { @@ -1067,6 +1072,7 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) Datum get_rebalance_progress(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *segmentList = NIL; ListCell *rebalanceMonitorCell = NULL; TupleDesc tupdesc; @@ -2811,6 +2817,7 @@ pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS) Datum citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); EnsureShardCostUDF(PG_GETARG_OID(0)); EnsureNodeCapacityUDF(PG_GETARG_OID(1)); EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2)); diff --git a/src/backend/distributed/operations/split_shards.c b/src/backend/distributed/operations/split_shards.c index 80d3f930f..df95faded 100644 --- a/src/backend/distributed/operations/split_shards.c +++ b/src/backend/distributed/operations/split_shards.c @@ -61,10 +61,10 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS) Datum worker_hash(PG_FUNCTION_ARGS) { - Datum valueDatum = PG_GETARG_DATUM(0); - CheckCitusVersion(ERROR); + Datum valueDatum = PG_GETARG_DATUM(0); + /* figure out hash function from the data type */ Oid valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); TypeCacheEntry *typeEntry = lookup_type_cache(valueDataType, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index ed48bdb0f..225317132 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(citus_update_table_statistics); Datum master_create_empty_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); uint32 attemptableNodeCount = 0; @@ -108,8 +110,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS) Oid relationId = ResolveRelationId(relationNameText, false); char relationKind = get_rel_relkind(relationId); - CheckCitusVersion(ERROR); - EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -239,6 +239,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) Datum master_append_table_to_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 shardId = PG_GETARG_INT64(0); text *sourceTableNameText = PG_GETARG_TEXT_P(1); text *sourceNodeNameText = PG_GETARG_TEXT_P(2); @@ -249,8 +251,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) float4 shardFillLevel = 0.0; - CheckCitusVersion(ERROR); - ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; @@ -359,10 +359,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) Datum citus_update_shard_statistics(PG_FUNCTION_ARGS) { - int64 shardId = PG_GETARG_INT64(0); - CheckCitusVersion(ERROR); + int64 shardId = PG_GETARG_INT64(0); + uint64 shardSize = UpdateShardStatistics(shardId); PG_RETURN_INT64(shardSize); @@ -376,10 +376,10 @@ citus_update_shard_statistics(PG_FUNCTION_ARGS) Datum citus_update_table_statistics(PG_FUNCTION_ARGS) { - Oid distributedTableId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); + Oid distributedTableId = PG_GETARG_OID(0); + UpdateTableStatistics(distributedTableId); PG_RETURN_VOID(); diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 1120422b7..e79ab6a14 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -901,13 +901,13 @@ AppendShardIdToName(char **name, uint64 shardId) Datum shard_name(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); int64 shardId = PG_GETARG_INT64(1); char *qualifiedName = NULL; - CheckCitusVersion(ERROR); - if (shardId <= 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 6becf863c..84739cb49 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -39,6 +39,8 @@ PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph); Datum get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; HASH_SEQ_STATUS status; @@ -47,8 +49,6 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) Datum values[2]; bool isNulls[2]; - CheckCitusVersion(ERROR); - Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); WaitGraph *waitGraph = BuildGlobalWaitGraph(); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); diff --git a/src/backend/distributed/test/foreign_key_relationship_query.c b/src/backend/distributed/test/foreign_key_relationship_query.c index a03856f54..bae2e77e0 100644 --- a/src/backend/distributed/test/foreign_key_relationship_query.c +++ b/src/backend/distributed/test/foreign_key_relationship_query.c @@ -77,11 +77,11 @@ drop_constraint_cascade_via_perform_deletion(PG_FUNCTION_ARGS) Datum get_referencing_relation_id_list(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *foreignRelationCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { @@ -136,11 +136,11 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS) Datum get_referenced_relation_id_list(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *foreignRelationCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { diff --git a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c b/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c index 4c5a21379..941c3ad6a 100644 --- a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c +++ b/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c @@ -30,11 +30,11 @@ PG_FUNCTION_INFO_V1(get_foreign_key_to_reference_table_commands); Datum get_foreign_key_to_reference_table_commands(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *commandsCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { diff --git a/src/backend/distributed/test/intermediate_results.c b/src/backend/distributed/test/intermediate_results.c index 22624eb8f..b4f14bca6 100644 --- a/src/backend/distributed/test/intermediate_results.c +++ b/src/backend/distributed/test/intermediate_results.c @@ -34,6 +34,8 @@ PG_FUNCTION_INFO_V1(store_intermediate_result_on_node); Datum store_intermediate_result_on_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeNameText = PG_GETARG_TEXT_P(0); char *nodeNameString = text_to_cstring(nodeNameText); int nodePort = PG_GETARG_INT32(1); @@ -44,8 +46,6 @@ store_intermediate_result_on_node(PG_FUNCTION_ARGS) bool writeLocalFile = false; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - WorkerNode *workerNode = FindWorkerNodeOrError(nodeNameString, nodePort); /* diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index 14b16be18..e0b7d806c 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -83,13 +83,13 @@ AllowNonIdleTransactionOnXactHandling(void) Datum start_session_level_connection_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); uint32 nodePort = PG_GETARG_UINT32(1); char *nodeNameString = text_to_cstring(nodeName); int connectionFlags = 0; - CheckCitusVersion(ERROR); - if (singleConnection != NULL && (strcmp(singleConnection->hostname, nodeNameString) != 0 || singleConnection->port != nodePort)) diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 8026e50c1..45c2ff2a5 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -106,6 +106,8 @@ PG_FUNCTION_INFO_V1(get_all_active_transactions); Datum assign_distributed_transaction_id(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid userId = GetUserId(); /* prepare data before acquiring spinlock to protect against errors */ @@ -113,8 +115,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) uint64 transactionNumber = PG_GETARG_INT64(1); TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2); - CheckCitusVersion(ERROR); - /* MyBackendData should always be avaliable, just out of paranoia */ if (!MyBackendData) { @@ -166,14 +166,14 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) Datum get_current_transaction_id(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; Datum values[5]; bool isNulls[5]; - CheckCitusVersion(ERROR); - /* build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) { @@ -225,12 +225,13 @@ get_current_transaction_id(PG_FUNCTION_ARGS) Datum get_global_active_transactions(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; StringInfo queryToSend = makeStringInfo(); - CheckCitusVersion(ERROR); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY); @@ -336,9 +337,9 @@ get_global_active_transactions(PG_FUNCTION_ARGS) Datum get_all_active_transactions(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllActiveTransactions(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index b674350ca..91c9c2882 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -70,6 +70,9 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation); Datum mark_tables_colocated(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + Oid sourceRelationId = PG_GETARG_OID(0); ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); @@ -80,8 +83,6 @@ mark_tables_colocated(PG_FUNCTION_ARGS) "operation"))); } - CheckCitusVersion(ERROR); - EnsureCoordinator(); EnsureTableOwner(sourceRelationId); Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); @@ -108,11 +109,12 @@ mark_tables_colocated(PG_FUNCTION_ARGS) Datum update_distributed_table_colocation(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + Oid targetRelationId = PG_GETARG_OID(0); text *colocateWithTableNameText = PG_GETARG_TEXT_P(1); - CheckCitusVersion(ERROR); - EnsureCoordinator(); EnsureTableOwner(targetRelationId); char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 2008eddb0..947740096 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(column_to_column_name); Datum column_name_to_column(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *columnText = PG_GETARG_TEXT_P(1); char *columnName = text_to_cstring(columnText); - CheckCitusVersion(ERROR); - Relation relation = relation_open(relationId, AccessShareLock); Var *column = BuildDistributionKeyFromColumnName(relation, columnName); @@ -100,13 +100,13 @@ column_name_to_column_id(PG_FUNCTION_ARGS) Datum column_to_column_name(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *columnNodeText = PG_GETARG_TEXT_P(1); char *columnNodeString = text_to_cstring(columnNodeText); - CheckCitusVersion(ERROR); - char *columnName = ColumnToColumnName(relationId, columnNodeString); text *columnText = cstring_to_text(columnName); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 602157339..3861c2ac1 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -97,11 +97,11 @@ PG_FUNCTION_INFO_V1(lock_relation_if_exists); Datum lock_shard_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); - CheckCitusVersion(ERROR); - if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); @@ -134,11 +134,11 @@ lock_shard_metadata(PG_FUNCTION_ARGS) Datum lock_shard_resources(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); - CheckCitusVersion(ERROR); - if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 823a5035f..f41e60f21 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(master_expire_table_cache); Datum worker_fetch_partition_file(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 partitionTaskId = PG_GETARG_UINT32(1); uint32 partitionFileId = PG_GETARG_UINT32(2); @@ -115,8 +117,6 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); - CheckCitusVersion(ERROR); - if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -383,6 +383,8 @@ CitusDeleteFile(const char *filename) Datum worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 shardId = PG_GETARG_INT64(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *ddlCommandText = PG_GETARG_TEXT_P(2); @@ -391,8 +393,6 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); - CheckCitusVersion(ERROR); - /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_QUERY, NULL, @@ -410,6 +410,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 leftShardId = PG_GETARG_INT64(0); text *leftShardSchemaNameText = PG_GETARG_TEXT_P(1); uint64 rightShardId = PG_GETARG_INT64(2); @@ -421,8 +423,6 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); - CheckCitusVersion(ERROR); - /* extend names in ddl command and apply extended command */ RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, @@ -443,6 +443,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_apply_sequence_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *commandText = PG_GETARG_TEXT_P(0); Oid sequenceTypeId = PG_GETARG_OID(1); const char *commandString = text_to_cstring(commandText); @@ -450,8 +452,6 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) NodeTag nodeType = nodeTag(commandNode); - CheckCitusVersion(ERROR); - if (nodeType != T_CreateSeqStmt) { ereport(ERROR, @@ -579,6 +579,8 @@ ParseTreeRawStmt(const char *ddlCommand) Datum worker_append_table_to_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *shardQualifiedNameText = PG_GETARG_TEXT_P(0); text *sourceQualifiedNameText = PG_GETARG_TEXT_P(1); text *sourceNodeNameText = PG_GETARG_TEXT_P(2); @@ -596,8 +598,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - CheckCitusVersion(ERROR); - /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 682a2d95c..165eb13d1 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -49,15 +49,15 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table); Datum worker_drop_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureSuperUser(); + text *relationName = PG_GETARG_TEXT_P(0); Oid relationId = ResolveRelationId(relationName, true); ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; char relationKind = '\0'; - CheckCitusVersion(ERROR); - EnsureSuperUser(); - if (!OidIsValid(relationId)) { ereport(NOTICE, (errmsg("relation %s does not exist, skipping", diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index b2a473dea..5a5535560 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -35,13 +35,13 @@ PG_FUNCTION_INFO_V1(worker_find_block_local_path); Datum worker_foreign_file_path(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *foreignTableName = PG_GETARG_TEXT_P(0); text *foreignFilePath = NULL; Oid relationId = ResolveRelationId(foreignTableName, false); ForeignTable *foreignTable = GetForeignTable(relationId); - CheckCitusVersion(ERROR); - DefElem *option = NULL; foreach_ptr(option, foreignTable->options) { @@ -75,6 +75,8 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) Datum worker_find_block_local_path(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + int64 blockId = PG_GETARG_INT64(0); ArrayType *dataDirectoryObject = PG_GETARG_ARRAYTYPE_P(1); @@ -82,8 +84,6 @@ worker_find_block_local_path(PG_FUNCTION_ARGS) (void) blockId; (void) dataDirectoryObject; - CheckCitusVersion(ERROR); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("called function is currently unsupported"))); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 8b1f07870..a539a9c90 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -68,14 +68,14 @@ PG_FUNCTION_INFO_V1(worker_repartition_cleanup); Datum worker_create_schema(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); text *ownerText = PG_GETARG_TEXT_P(1); char *ownerString = TextDatumGetCString(ownerText); StringInfo jobSchemaName = JobSchemaName(jobId); - CheckCitusVersion(ERROR); - bool schemaExists = JobSchemaExists(jobSchemaName); if (!schemaExists) { @@ -144,12 +144,12 @@ CreateJobSchema(StringInfo schemaName, char *schemaOwner) Datum worker_repartition_cleanup(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); StringInfo jobDirectoryName = JobDirectoryName(jobId); StringInfo jobSchemaName = JobSchemaName(jobId); - CheckCitusVersion(ERROR); - Oid schemaId = get_namespace_oid(jobSchemaName->data, false); EnsureSchemaOwner(schemaId); @@ -173,6 +173,8 @@ worker_repartition_cleanup(PG_FUNCTION_ARGS) Datum worker_merge_files_into_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); ArrayType *columnNameObject = PG_GETARG_ARRAYTYPE_P(2); @@ -189,8 +191,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) int32 columnNameCount = ArrayObjectCount(columnNameObject); int32 columnTypeCount = ArrayObjectCount(columnTypeObject); - CheckCitusVersion(ERROR); - if (columnNameCount != columnTypeCount) { ereport(ERROR, (errmsg("column name array size: %d and type array size: %d" @@ -264,11 +264,11 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) Datum worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ScanKey scanKey = NULL; int scanKeyCount = 0; - CheckCitusVersion(ERROR); - Relation pgNamespace = table_open(NamespaceRelationId, AccessExclusiveLock); TableScanDesc scanDescriptor = table_beginscan_catalog(pgNamespace, scanKeyCount, scanKey); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 0da3a5bb0..7dc6c75ee 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -109,6 +109,8 @@ PG_FUNCTION_INFO_V1(worker_hash_partition_table); Datum worker_range_partition_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); text *filterQueryText = PG_GETARG_TEXT_P(2); @@ -130,8 +132,6 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* first check that array element's and partition column's types match */ Oid splitPointType = ARR_ELEMTYPE(splitPointObject); - CheckCitusVersion(ERROR); - if (splitPointType != partitionColumnType) { ereport(ERROR, (errmsg("partition column type %u and split point type %u " @@ -188,6 +188,8 @@ worker_range_partition_table(PG_FUNCTION_ARGS) Datum worker_hash_partition_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); text *filterQueryText = PG_GETARG_TEXT_P(2); @@ -209,8 +211,6 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject); int32 partitionCount = ArrayObjectCount(hashRangeObject); - CheckCitusVersion(ERROR); - HashPartitionContext *partitionContext = palloc0(sizeof(HashPartitionContext)); partitionContext->syntheticShardIntervalArray = SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index e0c0d0747..9f11ebf6d 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -38,11 +38,11 @@ PG_FUNCTION_INFO_V1(relation_is_a_known_shard); Datum relation_is_a_known_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool onlySearchPath = true; - CheckCitusVersion(ERROR); - PG_RETURN_BOOL(RelationIsAKnownShard(relationId, onlySearchPath)); } @@ -55,12 +55,12 @@ relation_is_a_known_shard(PG_FUNCTION_ARGS) Datum citus_table_is_visible(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); char relKind = '\0'; bool onlySearchPath = true; - CheckCitusVersion(ERROR); - /* * We don't want to deal with not valid/existing relations * as pg_table_is_visible does. diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c index c4149dca2..9c7c5af78 100644 --- a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(worker_create_truncate_trigger); Datum worker_create_truncate_trigger(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - - EnsureSuperUser(); CheckCitusVersion(ERROR); + EnsureSuperUser(); + + Oid relationId = PG_GETARG_OID(0); /* Create the truncate trigger */ CreateTruncateTrigger(relationId);