Merge pull request #2492 from citusdata/udf_cleanup

Clean up UDFs and revoke unnecessary permissions
pull/2497/head
Marco Slot 2018-11-26 14:21:34 +01:00 committed by GitHub
commit 6fd5c73444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 54 additions and 171 deletions

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-10'
default_version = '8.0-11'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2 7.4-3 \
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10 8.0-11
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -235,6 +235,8 @@ $(EXTENSION)--8.0-9.sql: $(EXTENSION)--8.0-8.sql $(EXTENSION)--8.0-8--8.0-9.sql
cat $^ > $@
$(EXTENSION)--8.0-10.sql: $(EXTENSION)--8.0-9.sql $(EXTENSION)--8.0-9--8.0-10.sql
cat $^ > $@
$(EXTENSION)--8.0-11.sql: $(EXTENSION)--8.0-10.sql $(EXTENSION)--8.0-10--8.0-11.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,21 @@
/* citus--8.0-10--8.0-11 */
SET search_path = 'pg_catalog';
-- Deprecated functions
DROP FUNCTION IF EXISTS worker_hash_partition_table(bigint,integer,text,text,oid,integer);
DROP FUNCTION IF EXISTS worker_foreign_file_path(text);
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text);
REVOKE ALL ON FUNCTION create_insert_proxy_for_table(regclass,regclass) FROM PUBLIC;
-- Testing functions
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;
-- Maintenance function
REVOKE ALL ON FUNCTION worker_cleanup_job_schema_cache() FROM PUBLIC;
REVOKE ALL ON FUNCTION recover_prepared_transactions() FROM PUBLIC;
REVOKE ALL ON FUNCTION check_distributed_deadlocks() FROM PUBLIC;
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-10'
default_version = '8.0-11'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -219,28 +219,6 @@ COMMENT ON FUNCTION worker_fetch_partition_file(bigint, integer, integer, intege
integer)
IS 'fetch partition file from remote node';
CREATE FUNCTION worker_fetch_query_results_file(bigint, integer, integer, text, integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_query_results_file$$;
COMMENT ON FUNCTION worker_fetch_query_results_file(bigint, integer, integer, text,
integer)
IS 'fetch query results file from remote node';
CREATE FUNCTION worker_fetch_foreign_file(text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$;
COMMENT ON FUNCTION worker_fetch_foreign_file(text, bigint, text[], integer[])
IS 'fetch foreign file from remote node and apply file';
CREATE FUNCTION worker_fetch_regular_table(text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$;
COMMENT ON FUNCTION worker_fetch_regular_table(text, bigint, text[], integer[])
IS 'fetch PostgreSQL table from remote node';
CREATE FUNCTION worker_range_partition_table(bigint, integer, text, text, oid, anyarray)
RETURNS void
LANGUAGE C STRICT
@ -278,20 +256,6 @@ CREATE FUNCTION worker_cleanup_job_schema_cache()
COMMENT ON FUNCTION worker_cleanup_job_schema_cache()
IS 'cleanup all job schemas in current database';
CREATE FUNCTION worker_foreign_file_path(text)
RETURNS text
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_foreign_file_path$$;
COMMENT ON FUNCTION worker_foreign_file_path(text)
IS 'get a foreign table''s local file path';
CREATE FUNCTION worker_find_block_local_path(bigint, text[])
RETURNS text
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_find_block_local_path$$;
COMMENT ON FUNCTION worker_find_block_local_path(bigint, text[])
IS 'find an HDFS block''s local file path';
CREATE FUNCTION worker_apply_shard_ddl_command(bigint, text)
RETURNS void
LANGUAGE C STRICT

View File

@ -71,7 +71,6 @@ static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_fetch_partition_file);
PG_FUNCTION_INFO_V1(worker_fetch_query_results_file);
PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_sequence_command);
@ -81,6 +80,7 @@ PG_FUNCTION_INFO_V1(worker_append_table_to_shard);
* Following UDFs are stub functions, you can check their comments for more
* detail.
*/
PG_FUNCTION_INFO_V1(worker_fetch_query_results_file);
PG_FUNCTION_INFO_V1(worker_fetch_regular_table);
PG_FUNCTION_INFO_V1(worker_fetch_foreign_file);
PG_FUNCTION_INFO_V1(master_expire_table_cache);
@ -132,52 +132,6 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
}
/*
* worker_fetch_query_results_file fetches a query results file from the remote
* node. The function assumes an upstream compute task depends on this query
* results file, and therefore directly fetches the file into the upstream
* task's directory.
*/
Datum
worker_fetch_query_results_file(PG_FUNCTION_ARGS)
{
uint64 jobId = PG_GETARG_INT64(0);
uint32 queryTaskId = PG_GETARG_UINT32(1);
uint32 upstreamTaskId = PG_GETARG_UINT32(2);
text *nodeNameText = PG_GETARG_TEXT_P(3);
uint32 nodePort = PG_GETARG_UINT32(4);
char *nodeName = NULL;
/* remote filename is <jobId>/<queryTaskId> */
StringInfo remoteDirectoryName = JobDirectoryName(jobId);
StringInfo remoteFilename = TaskFilename(remoteDirectoryName, queryTaskId);
/* local filename is <jobId>/<upstreamTaskId>/<queryTaskId> */
StringInfo taskDirectoryName = TaskDirectoryName(jobId, upstreamTaskId);
StringInfo taskFilename = UserTaskFilename(taskDirectoryName, queryTaskId);
/*
* If we are the first function to fetch a file for the upstream task, the
* task directory does not exist. We then lock and create the directory.
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
}
nodeName = text_to_cstring(nodeNameText);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
/* Constructs a standardized task file path for given directory and task id. */
StringInfo
TaskFilename(StringInfo directoryName, uint32 taskId)
@ -1007,6 +961,20 @@ SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg)
}
/*
* worker_fetch_query_results_file is a stub UDF to allow the function object
* to be re-created during upgrades. We should keep this around until we drop
* support for Postgres 11, since Postgres 11 is the highest version for which
* this object may have been created.
*/
Datum
worker_fetch_query_results_file(PG_FUNCTION_ARGS)
{
ereport(DEBUG2, (errmsg("this function is deprecated and no longer is used")));
PG_RETURN_VOID();
}
/*
* worker_fetch_regular_table UDF is a stub UDF to install Citus flawlessly.
* Otherwise we need to delete them from our sql files, which is confusing

View File

@ -80,7 +80,6 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static uint32 RangePartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
static bool FileIsLink(char *filename, struct stat filestat);
@ -187,71 +186,32 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
text *filterQueryText = PG_GETARG_TEXT_P(2);
text *partitionColumnText = PG_GETARG_TEXT_P(3);
Oid partitionColumnType = PG_GETARG_OID(4);
ArrayType *hashRangeObject = NULL;
ArrayType *hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
const char *filterQuery = text_to_cstring(filterQueryText);
const char *partitionColumn = text_to_cstring(partitionColumnText);
HashPartitionContext *partitionContext = NULL;
FmgrInfo *hashFunction = NULL;
Datum *hashRangeArray = NULL;
int32 partitionCount = 0;
Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject);
int32 partitionCount = ArrayObjectCount(hashRangeObject);
StringInfo taskDirectory = NULL;
StringInfo taskAttemptDirectory = NULL;
FileOutputStream *partitionFileArray = NULL;
uint32 fileCount = 0;
uint32 (*HashPartitionIdFunction)(Datum, const void *);
Oid partitionBucketOid = InvalidOid;
uint32 (*hashPartitionIdFunction)(Datum, const void *);
CheckCitusVersion(ERROR);
partitionContext = palloc0(sizeof(HashPartitionContext));
partitionContext->syntheticShardIntervalArray =
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
partitionContext->hasUniformHashDistribution =
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
partitionCount);
/*
* We do this hack for backward compatibility.
*
* In the older versions of Citus, worker_hash_partition_table()'s 6th parameter
* was an integer which denoted the number of buckets to split the shard's data.
* In the later versions of Citus, the sixth parameter is changed to get an array
* of shard ranges, which is used as the ranges to split the shard's data.
*
* Keeping this value is important if the coordinator's Citus version is <= 7.3
* and worker Citus version is > 7.3.
*/
partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5);
if (partitionBucketOid == INT4ARRAYOID)
{
hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
hashRangeArray = DeconstructArrayObject(hashRangeObject);
partitionCount = ArrayObjectCount(hashRangeObject);
partitionContext->syntheticShardIntervalArray =
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
partitionContext->hasUniformHashDistribution =
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
partitionCount);
HashPartitionIdFunction = &HashPartitionId;
}
else if (partitionBucketOid == INT4OID)
{
partitionCount = PG_GETARG_UINT32(5);
partitionContext->syntheticShardIntervalArray =
GenerateSyntheticShardIntervalArray(partitionCount);
partitionContext->hasUniformHashDistribution = true;
HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI;
}
else
{
/* we should never get other type of parameters */
ereport(ERROR, (errmsg("unexpected parameter for "
"worker_hash_partition_table()")));
}
hashPartitionIdFunction = &HashPartitionId;
/* use column's type information to get the hashing function */
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC);
@ -278,7 +238,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
/* call the partitioning function that does the actual work */
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType,
HashPartitionIdFunction, (const void *) partitionContext,
hashPartitionIdFunction, (const void *) partitionContext,
partitionFileArray, fileCount);
/* close partition files and atomically rename (commit) them */
@ -1318,37 +1278,3 @@ HashPartitionId(Datum partitionValue, const void *context)
return hashPartitionId;
}
/*
* HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility
* between the Citus versions 7.4 and older versions.
*
* HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value
* using hash partitioning. More specifically, the function returns zero if the
* given data value is null. If not, the function applies the standard Postgres
* hashing function for the given data type, and mods the hashed result with the
* number of partitions. The function then returns the modded number as the
* partition number.
*
* Note that any changes to PostgreSQL's hashing functions will reshuffle the
* entire distribution created by this function. For a discussion of this issue,
* see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4."
*/
static uint32
HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context)
{
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;
uint32 partitionCount = hashPartitionContext->partitionCount;
Datum hashDatum = 0;
uint32 hashResult = 0;
uint32 hashPartitionId = 0;
/* hash functions return unsigned 32-bit integers */
hashDatum = FunctionCall1(hashFunction, partitionValue);
hashResult = DatumGetUInt32(hashDatum);
hashPartitionId = (hashResult % partitionCount);
return hashPartitionId;
}

View File

@ -153,6 +153,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-7';
ALTER EXTENSION citus UPDATE TO '8.0-8';
ALTER EXTENSION citus UPDATE TO '8.0-9';
ALTER EXTENSION citus UPDATE TO '8.0-10';
ALTER EXTENSION citus UPDATE TO '8.0-11';
-- show running version
SHOW citus.version;
citus.version

View File

@ -153,6 +153,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-7';
ALTER EXTENSION citus UPDATE TO '8.0-8';
ALTER EXTENSION citus UPDATE TO '8.0-9';
ALTER EXTENSION citus UPDATE TO '8.0-10';
ALTER EXTENSION citus UPDATE TO '8.0-11';
-- show running version
SHOW citus.version;