diff --git a/citus.control b/citus.control index 7dc8c29f6..c65a4f18b 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-10' +default_version = '8.0-11' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index b8db6fc77..346ee3db8 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10 + 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10 8.0-11 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -235,6 +235,8 @@ $(EXTENSION)--8.0-9.sql: $(EXTENSION)--8.0-8.sql $(EXTENSION)--8.0-8--8.0-9.sql cat $^ > $@ $(EXTENSION)--8.0-10.sql: $(EXTENSION)--8.0-9.sql $(EXTENSION)--8.0-9--8.0-10.sql cat $^ > $@ +$(EXTENSION)--8.0-11.sql: $(EXTENSION)--8.0-10.sql $(EXTENSION)--8.0-10--8.0-11.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-10--8.0-11.sql b/src/backend/distributed/citus--8.0-10--8.0-11.sql new file mode 100644 index 000000000..d96d45b46 --- /dev/null +++ b/src/backend/distributed/citus--8.0-10--8.0-11.sql @@ -0,0 +1,21 @@ +/* citus--8.0-10--8.0-11 */ +SET search_path = 'pg_catalog'; + +-- Deprecated functions +DROP FUNCTION IF EXISTS worker_hash_partition_table(bigint,integer,text,text,oid,integer); +DROP FUNCTION IF EXISTS worker_foreign_file_path(text); +DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]); +DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer); +DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text); +REVOKE ALL ON FUNCTION create_insert_proxy_for_table(regclass,regclass) FROM PUBLIC; + +-- Testing functions +REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; + +-- Maintenance function +REVOKE ALL ON FUNCTION worker_cleanup_job_schema_cache() FROM PUBLIC; +REVOKE ALL ON FUNCTION recover_prepared_transactions() FROM PUBLIC; +REVOKE ALL ON FUNCTION check_distributed_deadlocks() FROM PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 7dc8c29f6..c65a4f18b 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-10' +default_version = '8.0-11' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/citus.sql b/src/backend/distributed/citus.sql index b068a3bae..35bb3b128 100644 --- a/src/backend/distributed/citus.sql +++ b/src/backend/distributed/citus.sql @@ -219,28 +219,6 @@ COMMENT ON FUNCTION worker_fetch_partition_file(bigint, integer, integer, intege integer) IS 'fetch partition file from remote node'; -CREATE FUNCTION worker_fetch_query_results_file(bigint, integer, integer, text, integer) - RETURNS void - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$worker_fetch_query_results_file$$; -COMMENT ON FUNCTION worker_fetch_query_results_file(bigint, integer, integer, text, - integer) - IS 'fetch query results file from remote node'; - -CREATE FUNCTION worker_fetch_foreign_file(text, bigint, text[], integer[]) - RETURNS void - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$; -COMMENT ON FUNCTION worker_fetch_foreign_file(text, bigint, text[], integer[]) - IS 'fetch foreign file from remote node and apply file'; - -CREATE FUNCTION worker_fetch_regular_table(text, bigint, text[], integer[]) - RETURNS void - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$; -COMMENT ON FUNCTION worker_fetch_regular_table(text, bigint, text[], integer[]) - IS 'fetch PostgreSQL table from remote node'; - CREATE FUNCTION worker_range_partition_table(bigint, integer, text, text, oid, anyarray) RETURNS void LANGUAGE C STRICT @@ -278,20 +256,6 @@ CREATE FUNCTION worker_cleanup_job_schema_cache() COMMENT ON FUNCTION worker_cleanup_job_schema_cache() IS 'cleanup all job schemas in current database'; -CREATE FUNCTION worker_foreign_file_path(text) - RETURNS text - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$worker_foreign_file_path$$; -COMMENT ON FUNCTION worker_foreign_file_path(text) - IS 'get a foreign table''s local file path'; - -CREATE FUNCTION worker_find_block_local_path(bigint, text[]) - RETURNS text - LANGUAGE C STRICT - AS 'MODULE_PATHNAME', $$worker_find_block_local_path$$; -COMMENT ON FUNCTION worker_find_block_local_path(bigint, text[]) - IS 'find an HDFS block''s local file path'; - CREATE FUNCTION worker_apply_shard_ddl_command(bigint, text) RETURNS void LANGUAGE C STRICT diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index d0533158a..7fa65acb5 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -71,7 +71,6 @@ static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(worker_fetch_partition_file); -PG_FUNCTION_INFO_V1(worker_fetch_query_results_file); PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_sequence_command); @@ -81,6 +80,7 @@ PG_FUNCTION_INFO_V1(worker_append_table_to_shard); * Following UDFs are stub functions, you can check their comments for more * detail. */ +PG_FUNCTION_INFO_V1(worker_fetch_query_results_file); PG_FUNCTION_INFO_V1(worker_fetch_regular_table); PG_FUNCTION_INFO_V1(worker_fetch_foreign_file); PG_FUNCTION_INFO_V1(master_expire_table_cache); @@ -132,52 +132,6 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS) } -/* - * worker_fetch_query_results_file fetches a query results file from the remote - * node. The function assumes an upstream compute task depends on this query - * results file, and therefore directly fetches the file into the upstream - * task's directory. - */ -Datum -worker_fetch_query_results_file(PG_FUNCTION_ARGS) -{ - uint64 jobId = PG_GETARG_INT64(0); - uint32 queryTaskId = PG_GETARG_UINT32(1); - uint32 upstreamTaskId = PG_GETARG_UINT32(2); - text *nodeNameText = PG_GETARG_TEXT_P(3); - uint32 nodePort = PG_GETARG_UINT32(4); - char *nodeName = NULL; - - /* remote filename is / */ - StringInfo remoteDirectoryName = JobDirectoryName(jobId); - StringInfo remoteFilename = TaskFilename(remoteDirectoryName, queryTaskId); - - /* local filename is // */ - StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId); - StringInfo taskFilename = UserTaskFilename(taskDirectoryName, queryTaskId); - - /* - * If we are the first function to fetch a file for the upstream task, the - * task directory does not exist. We then lock and create the directory. - */ - bool taskDirectoryExists = DirectoryExists(taskDirectoryName); - - CheckCitusVersion(ERROR); - - if (!taskDirectoryExists) - { - InitTaskDirectory(jobId, upstreamTaskId); - } - - nodeName = text_to_cstring(nodeNameText); - - /* we've made sure the file names are sanitized, safe to fetch as superuser */ - FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename); - - PG_RETURN_VOID(); -} - - /* Constructs a standardized task file path for given directory and task id. */ StringInfo TaskFilename(StringInfo directoryName, uint32 taskId) @@ -1007,6 +961,20 @@ SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg) } +/* + * worker_fetch_query_results_file is a stub UDF to allow the function object + * to be re-created during upgrades. We should keep this around until we drop + * support for Postgres 11, since Postgres 11 is the highest version for which + * this object may have been created. + */ +Datum +worker_fetch_query_results_file(PG_FUNCTION_ARGS) +{ + ereport(DEBUG2, (errmsg("this function is deprecated and no longer is used"))); + PG_RETURN_VOID(); +} + + /* * worker_fetch_regular_table UDF is a stub UDF to install Citus flawlessly. * Otherwise we need to delete them from our sql files, which is confusing diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d5ca9b44f..0482ff3a9 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -80,7 +80,6 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionId(Datum partitionValue, const void *context); -static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context); static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId); static bool FileIsLink(char *filename, struct stat filestat); @@ -187,71 +186,32 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) text *filterQueryText = PG_GETARG_TEXT_P(2); text *partitionColumnText = PG_GETARG_TEXT_P(3); Oid partitionColumnType = PG_GETARG_OID(4); - ArrayType *hashRangeObject = NULL; + ArrayType *hashRangeObject = PG_GETARG_ARRAYTYPE_P(5); const char *filterQuery = text_to_cstring(filterQueryText); const char *partitionColumn = text_to_cstring(partitionColumnText); HashPartitionContext *partitionContext = NULL; FmgrInfo *hashFunction = NULL; - Datum *hashRangeArray = NULL; - int32 partitionCount = 0; + Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject); + int32 partitionCount = ArrayObjectCount(hashRangeObject); StringInfo taskDirectory = NULL; StringInfo taskAttemptDirectory = NULL; FileOutputStream *partitionFileArray = NULL; uint32 fileCount = 0; - uint32 (*HashPartitionIdFunction)(Datum, const void *); - - Oid partitionBucketOid = InvalidOid; + uint32 (*hashPartitionIdFunction)(Datum, const void *); CheckCitusVersion(ERROR); partitionContext = palloc0(sizeof(HashPartitionContext)); + partitionContext->syntheticShardIntervalArray = + SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); + partitionContext->hasUniformHashDistribution = + HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray, + partitionCount); - /* - * We do this hack for backward compatibility. - * - * In the older versions of Citus, worker_hash_partition_table()'s 6th parameter - * was an integer which denoted the number of buckets to split the shard's data. - * In the later versions of Citus, the sixth parameter is changed to get an array - * of shard ranges, which is used as the ranges to split the shard's data. - * - * Keeping this value is important if the coordinator's Citus version is <= 7.3 - * and worker Citus version is > 7.3. - */ - partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5); - if (partitionBucketOid == INT4ARRAYOID) - { - hashRangeObject = PG_GETARG_ARRAYTYPE_P(5); - - hashRangeArray = DeconstructArrayObject(hashRangeObject); - partitionCount = ArrayObjectCount(hashRangeObject); - - partitionContext->syntheticShardIntervalArray = - SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); - partitionContext->hasUniformHashDistribution = - HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray, - partitionCount); - - HashPartitionIdFunction = &HashPartitionId; - } - else if (partitionBucketOid == INT4OID) - { - partitionCount = PG_GETARG_UINT32(5); - - partitionContext->syntheticShardIntervalArray = - GenerateSyntheticShardIntervalArray(partitionCount); - partitionContext->hasUniformHashDistribution = true; - - HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI; - } - else - { - /* we should never get other type of parameters */ - ereport(ERROR, (errmsg("unexpected parameter for " - "worker_hash_partition_table()"))); - } + hashPartitionIdFunction = &HashPartitionId; /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC); @@ -278,7 +238,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) /* call the partitioning function that does the actual work */ FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType, - HashPartitionIdFunction, (const void *) partitionContext, + hashPartitionIdFunction, (const void *) partitionContext, partitionFileArray, fileCount); /* close partition files and atomically rename (commit) them */ @@ -1318,37 +1278,3 @@ HashPartitionId(Datum partitionValue, const void *context) return hashPartitionId; } - - -/* - * HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility - * between the Citus versions 7.4 and older versions. - * - * HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value - * using hash partitioning. More specifically, the function returns zero if the - * given data value is null. If not, the function applies the standard Postgres - * hashing function for the given data type, and mods the hashed result with the - * number of partitions. The function then returns the modded number as the - * partition number. - * - * Note that any changes to PostgreSQL's hashing functions will reshuffle the - * entire distribution created by this function. For a discussion of this issue, - * see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4." - */ -static uint32 -HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context) -{ - HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context; - FmgrInfo *hashFunction = hashPartitionContext->hashFunction; - uint32 partitionCount = hashPartitionContext->partitionCount; - Datum hashDatum = 0; - uint32 hashResult = 0; - uint32 hashPartitionId = 0; - - /* hash functions return unsigned 32-bit integers */ - hashDatum = FunctionCall1(hashFunction, partitionValue); - hashResult = DatumGetUInt32(hashDatum); - hashPartitionId = (hashResult % partitionCount); - - return hashPartitionId; -} diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c1dfc0c9c..537feaa2d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -153,6 +153,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-7'; ALTER EXTENSION citus UPDATE TO '8.0-8'; ALTER EXTENSION citus UPDATE TO '8.0-9'; ALTER EXTENSION citus UPDATE TO '8.0-10'; +ALTER EXTENSION citus UPDATE TO '8.0-11'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 4c2724570..4d8c4e636 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -153,6 +153,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-7'; ALTER EXTENSION citus UPDATE TO '8.0-8'; ALTER EXTENSION citus UPDATE TO '8.0-9'; ALTER EXTENSION citus UPDATE TO '8.0-10'; +ALTER EXTENSION citus UPDATE TO '8.0-11'; -- show running version SHOW citus.version;