From b1cad26ebcc05674651101f4f8f56296d84ca63f Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 1 Jun 2021 11:39:14 +0200 Subject: [PATCH] Move CheckCitusVersion to the top of each function Previously this was usually done after argument parsing. This can cause SEGFAULTs if the number or type of arguments changes in a new version. By checking that Citus version is correct before doing any argument parsing we protect against these types of issues. Issues like this have occurred in pg_auto_failover, so it's not just a theoretical issue. The main reason why these calls were not at the top of functions is really just historical. It was because in the past we didn't allow statements before declarations. Thus having this check before the argument parsing would have only been possible if we first declared all variables. In addition to moving existing CheckCitusVersion calls it also adds these calls to rebalancer related functions (they were missing there). --- .../distributed/commands/alter_table.c | 13 ++++----- .../commands/create_distributed_table.c | 10 +++---- .../commands/drop_distributed_table.c | 8 ++--- src/backend/distributed/commands/truncate.c | 5 ++-- .../locally_reserved_shared_connections.c | 4 +-- .../connection/shared_connection_stats.c | 4 +-- .../executor/intermediate_results.c | 20 ++++++------- .../partitioned_intermediate_results.c | 4 +-- .../distributed/metadata/metadata_cache.c | 28 +++++++++--------- .../distributed/metadata/metadata_sync.c | 12 ++++---- .../distributed/metadata/metadata_utility.c | 12 ++++---- .../distributed/metadata/node_metadata.c | 29 +++++++++---------- .../operations/citus_create_restore_point.c | 4 +-- .../distributed/operations/citus_tools.c | 4 +-- .../distributed/operations/create_shards.c | 6 ++-- .../distributed/operations/delete_protocol.c | 10 +++---- .../operations/modify_multiple_shards.c | 4 +-- .../distributed/operations/node_protocol.c | 16 +++++----- .../distributed/operations/partitioning.c | 4 +-- .../distributed/operations/repair_shards.c | 13 ++++----- .../distributed/operations/shard_rebalancer.c | 7 +++++ .../distributed/operations/split_shards.c | 4 +-- .../distributed/operations/stage_protocol.c | 16 +++++----- .../distributed/relay/relay_event_utility.c | 4 +-- .../test/distributed_deadlock_detection.c | 4 +-- .../test/foreign_key_relationship_query.c | 8 ++--- ...foreign_key_to_reference_table_rebalance.c | 4 +-- .../distributed/test/intermediate_results.c | 4 +-- .../test/run_from_same_connection.c | 4 +-- .../distributed/transaction/backend_data.c | 15 +++++----- .../distributed/utils/colocation_utils.c | 10 ++++--- .../distributed/utils/distribution_column.c | 8 ++--- src/backend/distributed/utils/resource_lock.c | 8 ++--- .../worker/worker_data_fetch_protocol.c | 20 ++++++------- .../distributed/worker/worker_drop_protocol.c | 6 ++-- .../worker/worker_file_access_protocol.c | 8 ++--- .../worker/worker_merge_protocol.c | 16 +++++----- .../worker/worker_partition_protocol.c | 8 ++--- .../worker/worker_shard_visibility.c | 8 ++--- .../worker/worker_truncate_trigger_protocol.c | 6 ++-- 40 files changed, 193 insertions(+), 185 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 84e47eac7..17b4264ba 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -219,11 +219,11 @@ PG_FUNCTION_INFO_V1(worker_change_sequence_dependency); Datum undistribute_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool cascadeViaForeignKeys = PG_GETARG_BOOL(1); - CheckCitusVersion(ERROR); - TableConversionParameters params = { .relationId = relationId, .cascadeViaForeignKeys = cascadeViaForeignKeys @@ -243,6 +243,8 @@ undistribute_table(PG_FUNCTION_ARGS) Datum alter_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); char *distributionColumn = NULL; @@ -280,9 +282,6 @@ alter_distributed_table(PG_FUNCTION_ARGS) } } - CheckCitusVersion(ERROR); - - TableConversionParameters params = { .relationId = relationId, .distributionColumn = distributionColumn, @@ -305,13 +304,13 @@ alter_distributed_table(PG_FUNCTION_ARGS) Datum alter_table_set_access_method(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *accessMethodText = PG_GETARG_TEXT_P(1); char *accessMethod = text_to_cstring(accessMethodText); - CheckCitusVersion(ERROR); - TableConversionParameters params = { .relationId = relationId, .accessMethod = accessMethod diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 2f03ccb17..1120a5b92 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -142,12 +142,11 @@ PG_FUNCTION_INFO_V1(create_reference_table); Datum master_create_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); char *colocateWithTableName = NULL; @@ -189,6 +188,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3)) { PG_RETURN_VOID(); @@ -221,8 +222,6 @@ create_distributed_table(PG_FUNCTION_ARGS) shardCountIsStrict = true; } - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); /* enable create_distributed_table on an empty node */ @@ -271,6 +270,7 @@ create_distributed_table(PG_FUNCTION_ARGS) Datum create_reference_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); char *colocateWithTableName = NULL; @@ -278,8 +278,6 @@ create_reference_table(PG_FUNCTION_ARGS) bool viaDeprecatedAPI = false; - CheckCitusVersion(ERROR); - EnsureCitusTableCanBeCreated(relationId); /* enable create_reference_table on an empty node */ diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 536e27206..79adf02a9 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -59,6 +59,8 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) Datum master_remove_partition_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *tableNameText = PG_GETARG_TEXT_P(2); @@ -66,8 +68,6 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *tableName = text_to_cstring(tableNameText); - CheckCitusVersion(ERROR); - /* * The SQL_DROP trigger calls this function even for tables that are * not distributed. In that case, silently ignore. This is not very @@ -97,6 +97,8 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS) Datum master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *tableNameText = PG_GETARG_TEXT_P(2); @@ -104,8 +106,6 @@ master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *tableName = text_to_cstring(tableNameText); - CheckCitusVersion(ERROR); - CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); MasterRemoveDistributedTableMetadataFromWorkers(relationId, schemaName, tableName); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index 03a2c98f3..185f788e5 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -174,10 +174,11 @@ TruncateTaskList(Oid relationId) Datum truncate_local_data_after_distributing_table(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); EnsureCoordinator(); + + Oid relationId = PG_GETARG_OID(0); + EnsureLocalTableCanBeTruncated(relationId); TruncateStmt *truncateStmt = makeNode(TruncateStmt); diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index 19bc93ae6..efe14b2ad 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -111,9 +111,9 @@ PG_FUNCTION_INFO_V1(citus_reserved_connection_stats); Datum citus_reserved_connection_stats(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllReservedConnections(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 649f2e9b5..89fb1cd19 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -136,9 +136,9 @@ PG_FUNCTION_INFO_V1(citus_remote_connection_stats); Datum citus_remote_connection_stats(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c45e64821..c1b8f86dc 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -118,6 +118,8 @@ PG_FUNCTION_INFO_V1(fetch_intermediate_results); Datum broadcast_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *resultIdText = PG_GETARG_TEXT_P(0); char *resultIdString = text_to_cstring(resultIdText); text *queryText = PG_GETARG_TEXT_P(1); @@ -125,8 +127,6 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) bool writeLocalFile = false; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - /* * Make sure that this transaction has a distributed transaction ID. * @@ -159,6 +159,8 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) Datum create_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *resultIdText = PG_GETARG_TEXT_P(0); char *resultIdString = text_to_cstring(resultIdText); text *queryText = PG_GETARG_TEXT_P(1); @@ -167,8 +169,6 @@ create_intermediate_result(PG_FUNCTION_ARGS) bool writeLocalFile = true; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - /* * Make sure that this transaction has a distributed transaction ID. * @@ -771,13 +771,13 @@ IntermediateResultSize(const char *resultId) Datum read_intermediate_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Datum resultId = PG_GETARG_DATUM(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); - CheckCitusVersion(ERROR); - ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1); PG_RETURN_DATUM(0); @@ -794,14 +794,14 @@ read_intermediate_result(PG_FUNCTION_ARGS) Datum read_intermediate_result_array(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); - CheckCitusVersion(ERROR); - int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS( resultIdObject)); Datum *resultIdArray = DeconstructArrayObject(resultIdObject); @@ -874,6 +874,8 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, Datum fetch_intermediate_results(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); Datum *resultIdArray = DeconstructArrayObject(resultIdObject); int32 resultCount = ArrayObjectCount(resultIdObject); @@ -885,8 +887,6 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) int resultIndex = 0; int64 totalBytesWritten = 0L; - CheckCitusVersion(ERROR); - if (resultCount == 0) { PG_RETURN_INT64(0); diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 3b481a231..c0f6e9d65 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -107,6 +107,8 @@ PG_FUNCTION_INFO_V1(worker_partition_query_result); Datum worker_partition_query_result(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; text *resultIdPrefixText = PG_GETARG_TEXT_P(0); @@ -136,8 +138,6 @@ worker_partition_query_result(PG_FUNCTION_ARGS) bool binaryCopy = PG_GETARG_BOOL(6); - CheckCitusVersion(ERROR); - if (!IsMultiStatementTransaction()) { ereport(ERROR, (errmsg("worker_partition_query_result can only be used in a " diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index aa5a61886..144334852 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -2641,6 +2641,8 @@ SecondaryNodeRoleId(void) Datum citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldLogicalRelationId = InvalidOid; Oid newLogicalRelationId = InvalidOid; @@ -2651,8 +2653,6 @@ citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - if (RelationGetRelid(triggerData->tg_relation) != DistPartitionRelationId()) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), @@ -2718,6 +2718,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldLogicalRelationId = InvalidOid; Oid newLogicalRelationId = InvalidOid; @@ -2728,8 +2730,6 @@ citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - if (RelationGetRelid(triggerData->tg_relation) != DistShardRelationId()) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), @@ -2795,6 +2795,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TriggerData *triggerData = (TriggerData *) fcinfo->context; Oid oldShardId = InvalidOid; Oid newShardId = InvalidOid; @@ -2805,8 +2807,6 @@ citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - /* * Before 7.0-2 this trigger is on pg_dist_shard_placement, * ignore trigger in this scenario. @@ -2884,14 +2884,14 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_node_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistNodeRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2919,14 +2919,14 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_conninfo_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - /* no-op in community edition */ PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2954,14 +2954,14 @@ master_dist_authinfo_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); @@ -2989,14 +2989,14 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS) Datum citus_dist_object_cache_invalidate(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), errmsg("must be called as trigger"))); } - CheckCitusVersion(ERROR); - CitusInvalidateRelcacheByRelid(DistObjectRelationId()); PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 686a206b4..48176495d 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -103,6 +103,8 @@ static bool got_SIGALRM = false; Datum start_metadata_sync_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); @@ -126,10 +128,10 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) /* fail if metadata synchronization doesn't succeed */ bool raiseInterrupts = true; + CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); EnsureModificationsCanRun(); - CheckCitusVersion(ERROR); PreventInTransactionBlock(true, "start_metadata_sync_to_node"); @@ -185,14 +187,14 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) Datum stop_metadata_sync_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + EnsureSuperUser(); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - EnsureCoordinator(); - EnsureSuperUser(); - CheckCitusVersion(ERROR); - LockRelationOid(DistNodeRelationId(), ExclusiveLock); WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index a5bd46e64..e0c04317b 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -270,13 +270,13 @@ citus_shard_sizes(PG_FUNCTION_ARGS) Datum citus_total_relation_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = PG_GETARG_BOOL(1); SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; @@ -301,12 +301,12 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Datum citus_table_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = true; SizeQueryType sizeQueryType = TABLE_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; @@ -331,12 +331,12 @@ citus_table_size(PG_FUNCTION_ARGS) Datum citus_relation_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool failOnError = true; SizeQueryType sizeQueryType = RELATION_SIZE; - CheckCitusVersion(ERROR); - if (CStoreTable(relationId)) { sizeQueryType = CSTORE_TABLE_SIZE; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 0b4f03f93..f2305f644 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -161,6 +161,8 @@ DefaultNodeMetadata() Datum citus_set_coordinator_host(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -173,8 +175,6 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) Name nodeClusterName = PG_GETARG_NAME(3); nodeMetadata.nodeCluster = NameStr(*nodeClusterName); - CheckCitusVersion(ERROR); - /* prevent concurrent modification */ LockRelationOid(DistNodeRelationId(), RowShareLock); @@ -219,6 +219,8 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) Datum citus_add_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -227,8 +229,6 @@ citus_add_node(PG_FUNCTION_ARGS) bool nodeAlreadyExists = false; nodeMetadata.groupId = PG_GETARG_INT32(2); - CheckCitusVersion(ERROR); - /* * During tests this function is called before nodeRole and nodeCluster have been * created. @@ -288,6 +288,8 @@ master_add_node(PG_FUNCTION_ARGS) Datum citus_add_inactive_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -299,8 +301,6 @@ citus_add_inactive_node(PG_FUNCTION_ARGS) nodeMetadata.nodeRole = PG_GETARG_OID(3); nodeMetadata.nodeCluster = NameStr(*nodeClusterName); - CheckCitusVersion(ERROR); - if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) { ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node"))); @@ -331,6 +331,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS) Datum citus_add_secondary_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -348,8 +350,6 @@ citus_add_secondary_node(PG_FUNCTION_ARGS) nodeMetadata.nodeRole = SecondaryNodeRoleId(); nodeMetadata.isActive = true; - CheckCitusVersion(ERROR); - int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); TransactionModifiedNodeMetadata = true; @@ -380,11 +380,11 @@ master_add_secondary_node(PG_FUNCTION_ARGS) Datum citus_remove_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - CheckCitusVersion(ERROR); - RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort); TransactionModifiedNodeMetadata = true; @@ -631,7 +631,6 @@ static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort) { CheckCitusVersion(ERROR); - EnsureCoordinator(); /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ @@ -843,6 +842,8 @@ ActivateNode(char *nodeName, int nodePort) Datum citus_update_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + int32 nodeId = PG_GETARG_INT32(0); text *newNodeName = PG_GETARG_TEXT_P(1); @@ -864,8 +865,6 @@ citus_update_node(PG_FUNCTION_ARGS) List *placementList = NIL; BackgroundWorkerHandle *handle = NULL; - CheckCitusVersion(ERROR); - WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -1077,10 +1076,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort) Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) { - ShardInterval *shardInterval = NULL; - CheckCitusVersion(ERROR); + ShardInterval *shardInterval = NULL; + /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore * we need to check all parameters for NULL values. diff --git a/src/backend/distributed/operations/citus_create_restore_point.c b/src/backend/distributed/operations/citus_create_restore_point.c index 9d9019a83..42fc5311f 100644 --- a/src/backend/distributed/operations/citus_create_restore_point.c +++ b/src/backend/distributed/operations/citus_create_restore_point.c @@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(citus_create_restore_point); Datum citus_create_restore_point(PG_FUNCTION_ARGS) { - text *restoreNameText = PG_GETARG_TEXT_P(0); - CheckCitusVersion(ERROR); EnsureSuperUser(); EnsureCoordinator(); + text *restoreNameText = PG_GETARG_TEXT_P(0); + if (RecoveryInProgress()) { ereport(ERROR, diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index 4035cf5e1..9b2eedefb 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -71,14 +71,14 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, Datum master_run_on_worker(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; bool parallelExecution = false; StringInfo *nodeNameArray = NULL; int *nodePortArray = NULL; StringInfo *commandStringArray = NULL; - CheckCitusVersion(ERROR); - /* check to see if caller supports us returning a tuplestore */ if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) { diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 74e0022a4..65cba434e 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -64,6 +64,9 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards); Datum master_create_worker_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + text *tableNameText = PG_GETARG_TEXT_P(0); int32 shardCount = PG_GETARG_INT32(1); int32 replicationFactor = PG_GETARG_INT32(2); @@ -74,9 +77,6 @@ master_create_worker_shards(PG_FUNCTION_ARGS) /* do not add any data */ bool useExclusiveConnections = false; - EnsureCoordinator(); - CheckCitusVersion(ERROR); - /* * distributed tables might have dependencies on different objects, since we create * shards for a distributed table via multiple sessions these objects will be created diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index a175d741a..7543beaa6 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -109,6 +109,9 @@ PG_FUNCTION_INFO_V1(master_drop_sequences); Datum master_apply_delete_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); List *deletableShardIntervalList = NIL; @@ -116,9 +119,6 @@ master_apply_delete_command(PG_FUNCTION_ARGS) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); Node *queryTreeNode = rawStmt->stmt; - EnsureCoordinator(); - CheckCitusVersion(ERROR); - if (!IsA(queryTreeNode, DeleteStmt)) { ereport(ERROR, (errmsg("query \"%s\" is not a delete statement", @@ -208,6 +208,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS) Datum citus_drop_all_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); @@ -215,8 +217,6 @@ citus_drop_all_shards(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *relationName = text_to_cstring(relationNameText); - CheckCitusVersion(ERROR); - /* * The SQL_DROP trigger calls this function even for tables that are * not distributed. In that case, silently ignore and return -1. diff --git a/src/backend/distributed/operations/modify_multiple_shards.c b/src/backend/distributed/operations/modify_multiple_shards.c index caac4cb64..cb740dabb 100644 --- a/src/backend/distributed/operations/modify_multiple_shards.c +++ b/src/backend/distributed/operations/modify_multiple_shards.c @@ -70,13 +70,13 @@ PG_FUNCTION_INFO_V1(master_modify_multiple_shards); Datum master_modify_multiple_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); Node *queryTreeNode = rawStmt->stmt; - CheckCitusVersion(ERROR); - if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt)) { ereport(ERROR, (errmsg("query \"%s\" is not a delete or update " diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index fc03a09d1..c52f6efcf 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -103,6 +103,8 @@ PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); Datum master_get_table_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *relationName = PG_GETARG_TEXT_P(0); Oid relationId = ResolveRelationId(relationName, false); @@ -112,8 +114,6 @@ master_get_table_metadata(PG_FUNCTION_ARGS) Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; - CheckCitusVersion(ERROR); - /* find partition tuple for partitioned relation */ CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId); @@ -201,11 +201,11 @@ CStoreTable(Oid relationId) Datum master_get_table_ddl_events(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *tableDDLEventCell = NULL; - CheckCitusVersion(ERROR); - /* * On the very first call to this function, we first use the given relation * name to get to the relation. We then recreate the list of DDL statements @@ -276,8 +276,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) Datum master_get_new_shardid(PG_FUNCTION_ARGS) { - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); uint64 shardId = GetNextShardId(); Datum shardIdDatum = Int64GetDatum(shardId); @@ -346,8 +346,8 @@ GetNextShardId() Datum master_get_new_placementid(PG_FUNCTION_ARGS) { - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); uint64 placementId = GetNextPlacementId(); Datum placementIdDatum = Int64GetDatum(placementId); @@ -453,11 +453,11 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS) Datum citus_get_active_worker_nodes(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; uint32 workerNodeCount = 0; - CheckCitusVersion(ERROR); - if (SRF_IS_FIRSTCALL()) { /* create a function context for cross-call persistence */ diff --git a/src/backend/distributed/operations/partitioning.c b/src/backend/distributed/operations/partitioning.c index c7d989d5a..9e2057927 100644 --- a/src/backend/distributed/operations/partitioning.c +++ b/src/backend/distributed/operations/partitioning.c @@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(time_partition_range); Datum time_partition_range(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); + /* create tuple descriptor for return value */ TupleDesc metadataDescriptor = NULL; TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL, diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index cfc7010bb..59e87f74a 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -122,6 +122,9 @@ bool CheckAvailableSpaceBeforeMove = true; Datum citus_copy_shard_placement(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + int64 shardId = PG_GETARG_INT64(0); text *sourceNodeNameText = PG_GETARG_TEXT_P(1); int32 sourceNodePort = PG_GETARG_INT32(2); @@ -133,9 +136,6 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) char *sourceNodeName = text_to_cstring(sourceNodeNameText); char *targetNodeName = text_to_cstring(targetNodeNameText); - EnsureCoordinator(); - CheckCitusVersion(ERROR); - char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL) { @@ -283,6 +283,9 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes) Datum citus_move_shard_placement(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + int64 shardId = PG_GETARG_INT64(0); char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1)); int32 sourceNodePort = PG_GETARG_INT32(2); @@ -294,10 +297,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) ListCell *colocatedTableCell = NULL; ListCell *colocatedShardCell = NULL; - - CheckCitusVersion(ERROR); - EnsureCoordinator(); - Oid relationId = RelationIdForShard(shardId); ErrorIfMoveCitusLocalTable(relationId); ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 91c00d7c0..92f37ec05 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -547,6 +547,7 @@ GetShardCost(uint64 shardId, void *voidContext) Datum citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); uint64 shardId = PG_GETARG_INT64(0); bool missingOk = false; ShardPlacement *shardPlacement = ActiveShardPlacement(shardId, missingOk); @@ -784,6 +785,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId) Datum rebalance_table_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *relationIdList = NIL; if (!PG_ARGISNULL(0)) { @@ -888,6 +890,7 @@ GetRebalanceStrategy(Name name) Datum citus_drain_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "nodename"); PG_ENSURE_ARGNOTNULL(1, "nodeport"); PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); @@ -932,6 +935,7 @@ citus_drain_node(PG_FUNCTION_ARGS) Datum replicate_table_shards(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); uint32 shardReplicationFactor = PG_GETARG_INT32(1); int32 maxShardCopies = PG_GETARG_INT32(2); @@ -986,6 +990,7 @@ master_drain_node(PG_FUNCTION_ARGS) Datum get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *relationIdList = NIL; if (!PG_ARGISNULL(0)) { @@ -1067,6 +1072,7 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) Datum get_rebalance_progress(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); List *segmentList = NIL; ListCell *rebalanceMonitorCell = NULL; TupleDesc tupdesc; @@ -2811,6 +2817,7 @@ pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS) Datum citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); EnsureShardCostUDF(PG_GETARG_OID(0)); EnsureNodeCapacityUDF(PG_GETARG_OID(1)); EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2)); diff --git a/src/backend/distributed/operations/split_shards.c b/src/backend/distributed/operations/split_shards.c index 80d3f930f..df95faded 100644 --- a/src/backend/distributed/operations/split_shards.c +++ b/src/backend/distributed/operations/split_shards.c @@ -61,10 +61,10 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS) Datum worker_hash(PG_FUNCTION_ARGS) { - Datum valueDatum = PG_GETARG_DATUM(0); - CheckCitusVersion(ERROR); + Datum valueDatum = PG_GETARG_DATUM(0); + /* figure out hash function from the data type */ Oid valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); TypeCacheEntry *typeEntry = lookup_type_cache(valueDataType, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index ed48bdb0f..225317132 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(citus_update_table_statistics); Datum master_create_empty_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); uint32 attemptableNodeCount = 0; @@ -108,8 +110,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS) Oid relationId = ResolveRelationId(relationNameText, false); char relationKind = get_rel_relkind(relationId); - CheckCitusVersion(ERROR); - EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); @@ -239,6 +239,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) Datum master_append_table_to_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 shardId = PG_GETARG_INT64(0); text *sourceTableNameText = PG_GETARG_TEXT_P(1); text *sourceNodeNameText = PG_GETARG_TEXT_P(2); @@ -249,8 +251,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) float4 shardFillLevel = 0.0; - CheckCitusVersion(ERROR); - ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; @@ -359,10 +359,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) Datum citus_update_shard_statistics(PG_FUNCTION_ARGS) { - int64 shardId = PG_GETARG_INT64(0); - CheckCitusVersion(ERROR); + int64 shardId = PG_GETARG_INT64(0); + uint64 shardSize = UpdateShardStatistics(shardId); PG_RETURN_INT64(shardSize); @@ -376,10 +376,10 @@ citus_update_shard_statistics(PG_FUNCTION_ARGS) Datum citus_update_table_statistics(PG_FUNCTION_ARGS) { - Oid distributedTableId = PG_GETARG_OID(0); - CheckCitusVersion(ERROR); + Oid distributedTableId = PG_GETARG_OID(0); + UpdateTableStatistics(distributedTableId); PG_RETURN_VOID(); diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 1120422b7..e79ab6a14 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -901,13 +901,13 @@ AppendShardIdToName(char **name, uint64 shardId) Datum shard_name(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); int64 shardId = PG_GETARG_INT64(1); char *qualifiedName = NULL; - CheckCitusVersion(ERROR); - if (shardId <= 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 6becf863c..84739cb49 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -39,6 +39,8 @@ PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph); Datum get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; HASH_SEQ_STATUS status; @@ -47,8 +49,6 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) Datum values[2]; bool isNulls[2]; - CheckCitusVersion(ERROR); - Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); WaitGraph *waitGraph = BuildGlobalWaitGraph(); HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); diff --git a/src/backend/distributed/test/foreign_key_relationship_query.c b/src/backend/distributed/test/foreign_key_relationship_query.c index a03856f54..bae2e77e0 100644 --- a/src/backend/distributed/test/foreign_key_relationship_query.c +++ b/src/backend/distributed/test/foreign_key_relationship_query.c @@ -77,11 +77,11 @@ drop_constraint_cascade_via_perform_deletion(PG_FUNCTION_ARGS) Datum get_referencing_relation_id_list(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *foreignRelationCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { @@ -136,11 +136,11 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS) Datum get_referenced_relation_id_list(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *foreignRelationCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { diff --git a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c b/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c index 4c5a21379..941c3ad6a 100644 --- a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c +++ b/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c @@ -30,11 +30,11 @@ PG_FUNCTION_INFO_V1(get_foreign_key_to_reference_table_commands); Datum get_foreign_key_to_reference_table_commands(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + FuncCallContext *functionContext = NULL; ListCell *commandsCell = NULL; - CheckCitusVersion(ERROR); - /* for the first we call this UDF, we need to populate the result to return set */ if (SRF_IS_FIRSTCALL()) { diff --git a/src/backend/distributed/test/intermediate_results.c b/src/backend/distributed/test/intermediate_results.c index 22624eb8f..b4f14bca6 100644 --- a/src/backend/distributed/test/intermediate_results.c +++ b/src/backend/distributed/test/intermediate_results.c @@ -34,6 +34,8 @@ PG_FUNCTION_INFO_V1(store_intermediate_result_on_node); Datum store_intermediate_result_on_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeNameText = PG_GETARG_TEXT_P(0); char *nodeNameString = text_to_cstring(nodeNameText); int nodePort = PG_GETARG_INT32(1); @@ -44,8 +46,6 @@ store_intermediate_result_on_node(PG_FUNCTION_ARGS) bool writeLocalFile = false; ParamListInfo paramListInfo = NULL; - CheckCitusVersion(ERROR); - WorkerNode *workerNode = FindWorkerNodeOrError(nodeNameString, nodePort); /* diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index 14b16be18..e0b7d806c 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -83,13 +83,13 @@ AllowNonIdleTransactionOnXactHandling(void) Datum start_session_level_connection_to_node(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *nodeName = PG_GETARG_TEXT_P(0); uint32 nodePort = PG_GETARG_UINT32(1); char *nodeNameString = text_to_cstring(nodeName); int connectionFlags = 0; - CheckCitusVersion(ERROR); - if (singleConnection != NULL && (strcmp(singleConnection->hostname, nodeNameString) != 0 || singleConnection->port != nodePort)) diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 8026e50c1..45c2ff2a5 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -106,6 +106,8 @@ PG_FUNCTION_INFO_V1(get_all_active_transactions); Datum assign_distributed_transaction_id(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid userId = GetUserId(); /* prepare data before acquiring spinlock to protect against errors */ @@ -113,8 +115,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) uint64 transactionNumber = PG_GETARG_INT64(1); TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2); - CheckCitusVersion(ERROR); - /* MyBackendData should always be avaliable, just out of paranoia */ if (!MyBackendData) { @@ -166,14 +166,14 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) Datum get_current_transaction_id(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; Datum values[5]; bool isNulls[5]; - CheckCitusVersion(ERROR); - /* build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) { @@ -225,12 +225,13 @@ get_current_transaction_id(PG_FUNCTION_ARGS) Datum get_global_active_transactions(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + TupleDesc tupleDescriptor = NULL; List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); List *connectionList = NIL; StringInfo queryToSend = makeStringInfo(); - CheckCitusVersion(ERROR); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY); @@ -336,9 +337,9 @@ get_global_active_transactions(PG_FUNCTION_ARGS) Datum get_all_active_transactions(PG_FUNCTION_ARGS) { - TupleDesc tupleDescriptor = NULL; - CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); StoreAllActiveTransactions(tupleStore, tupleDescriptor); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index b674350ca..91c9c2882 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -70,6 +70,9 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation); Datum mark_tables_colocated(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + Oid sourceRelationId = PG_GETARG_OID(0); ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); @@ -80,8 +83,6 @@ mark_tables_colocated(PG_FUNCTION_ARGS) "operation"))); } - CheckCitusVersion(ERROR); - EnsureCoordinator(); EnsureTableOwner(sourceRelationId); Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); @@ -108,11 +109,12 @@ mark_tables_colocated(PG_FUNCTION_ARGS) Datum update_distributed_table_colocation(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureCoordinator(); + Oid targetRelationId = PG_GETARG_OID(0); text *colocateWithTableNameText = PG_GETARG_TEXT_P(1); - CheckCitusVersion(ERROR); - EnsureCoordinator(); EnsureTableOwner(targetRelationId); char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 2008eddb0..947740096 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(column_to_column_name); Datum column_name_to_column(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *columnText = PG_GETARG_TEXT_P(1); char *columnName = text_to_cstring(columnText); - CheckCitusVersion(ERROR); - Relation relation = relation_open(relationId, AccessShareLock); Var *column = BuildDistributionKeyFromColumnName(relation, columnName); @@ -100,13 +100,13 @@ column_name_to_column_id(PG_FUNCTION_ARGS) Datum column_to_column_name(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); text *columnNodeText = PG_GETARG_TEXT_P(1); char *columnNodeString = text_to_cstring(columnNodeText); - CheckCitusVersion(ERROR); - char *columnName = ColumnToColumnName(relationId, columnNodeString); text *columnText = cstring_to_text(columnName); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 602157339..3861c2ac1 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -97,11 +97,11 @@ PG_FUNCTION_INFO_V1(lock_relation_if_exists); Datum lock_shard_metadata(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); - CheckCitusVersion(ERROR); - if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); @@ -134,11 +134,11 @@ lock_shard_metadata(PG_FUNCTION_ARGS) Datum lock_shard_resources(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); - CheckCitusVersion(ERROR); - if (ARR_NDIM(shardIdArrayObject) == 0) { ereport(ERROR, (errmsg("no locks specified"))); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 823a5035f..f41e60f21 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(master_expire_table_cache); Datum worker_fetch_partition_file(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 partitionTaskId = PG_GETARG_UINT32(1); uint32 partitionFileId = PG_GETARG_UINT32(2); @@ -115,8 +117,6 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) */ bool taskDirectoryExists = DirectoryExists(taskDirectoryName); - CheckCitusVersion(ERROR); - if (!taskDirectoryExists) { InitTaskDirectory(jobId, upstreamTaskId); @@ -383,6 +383,8 @@ CitusDeleteFile(const char *filename) Datum worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 shardId = PG_GETARG_INT64(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *ddlCommandText = PG_GETARG_TEXT_P(2); @@ -391,8 +393,6 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); - CheckCitusVersion(ERROR); - /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_QUERY, NULL, @@ -410,6 +410,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 leftShardId = PG_GETARG_INT64(0); text *leftShardSchemaNameText = PG_GETARG_TEXT_P(1); uint64 rightShardId = PG_GETARG_INT64(2); @@ -421,8 +423,6 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) const char *ddlCommand = text_to_cstring(ddlCommandText); Node *ddlCommandNode = ParseTreeNode(ddlCommand); - CheckCitusVersion(ERROR); - /* extend names in ddl command and apply extended command */ RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, @@ -443,6 +443,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_apply_sequence_command(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *commandText = PG_GETARG_TEXT_P(0); Oid sequenceTypeId = PG_GETARG_OID(1); const char *commandString = text_to_cstring(commandText); @@ -450,8 +452,6 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) NodeTag nodeType = nodeTag(commandNode); - CheckCitusVersion(ERROR); - if (nodeType != T_CreateSeqStmt) { ereport(ERROR, @@ -579,6 +579,8 @@ ParseTreeRawStmt(const char *ddlCommand) Datum worker_append_table_to_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *shardQualifiedNameText = PG_GETARG_TEXT_P(0); text *sourceQualifiedNameText = PG_GETARG_TEXT_P(1); text *sourceNodeNameText = PG_GETARG_TEXT_P(2); @@ -596,8 +598,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - CheckCitusVersion(ERROR); - /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 682a2d95c..165eb13d1 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -49,15 +49,15 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table); Datum worker_drop_distributed_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + EnsureSuperUser(); + text *relationName = PG_GETARG_TEXT_P(0); Oid relationId = ResolveRelationId(relationName, true); ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; char relationKind = '\0'; - CheckCitusVersion(ERROR); - EnsureSuperUser(); - if (!OidIsValid(relationId)) { ereport(NOTICE, (errmsg("relation %s does not exist, skipping", diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index b2a473dea..5a5535560 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -35,13 +35,13 @@ PG_FUNCTION_INFO_V1(worker_find_block_local_path); Datum worker_foreign_file_path(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + text *foreignTableName = PG_GETARG_TEXT_P(0); text *foreignFilePath = NULL; Oid relationId = ResolveRelationId(foreignTableName, false); ForeignTable *foreignTable = GetForeignTable(relationId); - CheckCitusVersion(ERROR); - DefElem *option = NULL; foreach_ptr(option, foreignTable->options) { @@ -75,6 +75,8 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) Datum worker_find_block_local_path(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + int64 blockId = PG_GETARG_INT64(0); ArrayType *dataDirectoryObject = PG_GETARG_ARRAYTYPE_P(1); @@ -82,8 +84,6 @@ worker_find_block_local_path(PG_FUNCTION_ARGS) (void) blockId; (void) dataDirectoryObject; - CheckCitusVersion(ERROR); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("called function is currently unsupported"))); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 8b1f07870..a539a9c90 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -68,14 +68,14 @@ PG_FUNCTION_INFO_V1(worker_repartition_cleanup); Datum worker_create_schema(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); text *ownerText = PG_GETARG_TEXT_P(1); char *ownerString = TextDatumGetCString(ownerText); StringInfo jobSchemaName = JobSchemaName(jobId); - CheckCitusVersion(ERROR); - bool schemaExists = JobSchemaExists(jobSchemaName); if (!schemaExists) { @@ -144,12 +144,12 @@ CreateJobSchema(StringInfo schemaName, char *schemaOwner) Datum worker_repartition_cleanup(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); StringInfo jobDirectoryName = JobDirectoryName(jobId); StringInfo jobSchemaName = JobSchemaName(jobId); - CheckCitusVersion(ERROR); - Oid schemaId = get_namespace_oid(jobSchemaName->data, false); EnsureSchemaOwner(schemaId); @@ -173,6 +173,8 @@ worker_repartition_cleanup(PG_FUNCTION_ARGS) Datum worker_merge_files_into_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); ArrayType *columnNameObject = PG_GETARG_ARRAYTYPE_P(2); @@ -189,8 +191,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) int32 columnNameCount = ArrayObjectCount(columnNameObject); int32 columnTypeCount = ArrayObjectCount(columnTypeObject); - CheckCitusVersion(ERROR); - if (columnNameCount != columnTypeCount) { ereport(ERROR, (errmsg("column name array size: %d and type array size: %d" @@ -264,11 +264,11 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) Datum worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + ScanKey scanKey = NULL; int scanKeyCount = 0; - CheckCitusVersion(ERROR); - Relation pgNamespace = table_open(NamespaceRelationId, AccessExclusiveLock); TableScanDesc scanDescriptor = table_beginscan_catalog(pgNamespace, scanKeyCount, scanKey); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 0da3a5bb0..7dc6c75ee 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -109,6 +109,8 @@ PG_FUNCTION_INFO_V1(worker_hash_partition_table); Datum worker_range_partition_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); text *filterQueryText = PG_GETARG_TEXT_P(2); @@ -130,8 +132,6 @@ worker_range_partition_table(PG_FUNCTION_ARGS) /* first check that array element's and partition column's types match */ Oid splitPointType = ARR_ELEMTYPE(splitPointObject); - CheckCitusVersion(ERROR); - if (splitPointType != partitionColumnType) { ereport(ERROR, (errmsg("partition column type %u and split point type %u " @@ -188,6 +188,8 @@ worker_range_partition_table(PG_FUNCTION_ARGS) Datum worker_hash_partition_table(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + uint64 jobId = PG_GETARG_INT64(0); uint32 taskId = PG_GETARG_UINT32(1); text *filterQueryText = PG_GETARG_TEXT_P(2); @@ -209,8 +211,6 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject); int32 partitionCount = ArrayObjectCount(hashRangeObject); - CheckCitusVersion(ERROR); - HashPartitionContext *partitionContext = palloc0(sizeof(HashPartitionContext)); partitionContext->syntheticShardIntervalArray = SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index e0c0d0747..9f11ebf6d 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -38,11 +38,11 @@ PG_FUNCTION_INFO_V1(relation_is_a_known_shard); Datum relation_is_a_known_shard(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); bool onlySearchPath = true; - CheckCitusVersion(ERROR); - PG_RETURN_BOOL(RelationIsAKnownShard(relationId, onlySearchPath)); } @@ -55,12 +55,12 @@ relation_is_a_known_shard(PG_FUNCTION_ARGS) Datum citus_table_is_visible(PG_FUNCTION_ARGS) { + CheckCitusVersion(ERROR); + Oid relationId = PG_GETARG_OID(0); char relKind = '\0'; bool onlySearchPath = true; - CheckCitusVersion(ERROR); - /* * We don't want to deal with not valid/existing relations * as pg_table_is_visible does. diff --git a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c index c4149dca2..9c7c5af78 100644 --- a/src/backend/distributed/worker/worker_truncate_trigger_protocol.c +++ b/src/backend/distributed/worker/worker_truncate_trigger_protocol.c @@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(worker_create_truncate_trigger); Datum worker_create_truncate_trigger(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - - EnsureSuperUser(); CheckCitusVersion(ERROR); + EnsureSuperUser(); + + Oid relationId = PG_GETARG_OID(0); /* Create the truncate trigger */ CreateTruncateTrigger(relationId);