mirror of https://github.com/citusdata/citus.git
Remove old worker_hash_partition_table API
parent
5a63deab2e
commit
1ec5b6c890
|
@ -2,6 +2,7 @@
|
||||||
SET search_path = 'pg_catalog';
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
-- Deprecated functions
|
-- Deprecated functions
|
||||||
|
DROP FUNCTION IF EXISTS worker_hash_partition_table(bigint,integer,text,text,oid,integer);
|
||||||
DROP FUNCTION IF EXISTS worker_foreign_file_path(text);
|
DROP FUNCTION IF EXISTS worker_foreign_file_path(text);
|
||||||
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
|
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
|
||||||
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
|
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
|
||||||
|
|
|
@ -80,7 +80,6 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil
|
||||||
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
|
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
|
||||||
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
||||||
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
||||||
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
|
|
||||||
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
|
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
|
||||||
static bool FileIsLink(char *filename, struct stat filestat);
|
static bool FileIsLink(char *filename, struct stat filestat);
|
||||||
|
|
||||||
|
@ -187,71 +186,32 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
|
||||||
text *filterQueryText = PG_GETARG_TEXT_P(2);
|
text *filterQueryText = PG_GETARG_TEXT_P(2);
|
||||||
text *partitionColumnText = PG_GETARG_TEXT_P(3);
|
text *partitionColumnText = PG_GETARG_TEXT_P(3);
|
||||||
Oid partitionColumnType = PG_GETARG_OID(4);
|
Oid partitionColumnType = PG_GETARG_OID(4);
|
||||||
ArrayType *hashRangeObject = NULL;
|
ArrayType *hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
|
||||||
|
|
||||||
const char *filterQuery = text_to_cstring(filterQueryText);
|
const char *filterQuery = text_to_cstring(filterQueryText);
|
||||||
const char *partitionColumn = text_to_cstring(partitionColumnText);
|
const char *partitionColumn = text_to_cstring(partitionColumnText);
|
||||||
|
|
||||||
HashPartitionContext *partitionContext = NULL;
|
HashPartitionContext *partitionContext = NULL;
|
||||||
FmgrInfo *hashFunction = NULL;
|
FmgrInfo *hashFunction = NULL;
|
||||||
Datum *hashRangeArray = NULL;
|
Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject);
|
||||||
int32 partitionCount = 0;
|
int32 partitionCount = ArrayObjectCount(hashRangeObject);
|
||||||
StringInfo taskDirectory = NULL;
|
StringInfo taskDirectory = NULL;
|
||||||
StringInfo taskAttemptDirectory = NULL;
|
StringInfo taskAttemptDirectory = NULL;
|
||||||
FileOutputStream *partitionFileArray = NULL;
|
FileOutputStream *partitionFileArray = NULL;
|
||||||
uint32 fileCount = 0;
|
uint32 fileCount = 0;
|
||||||
|
|
||||||
uint32 (*HashPartitionIdFunction)(Datum, const void *);
|
uint32 (*hashPartitionIdFunction)(Datum, const void *);
|
||||||
|
|
||||||
Oid partitionBucketOid = InvalidOid;
|
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
partitionContext = palloc0(sizeof(HashPartitionContext));
|
partitionContext = palloc0(sizeof(HashPartitionContext));
|
||||||
|
partitionContext->syntheticShardIntervalArray =
|
||||||
|
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
|
||||||
|
partitionContext->hasUniformHashDistribution =
|
||||||
|
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
|
||||||
|
partitionCount);
|
||||||
|
|
||||||
/*
|
hashPartitionIdFunction = &HashPartitionId;
|
||||||
* We do this hack for backward compatibility.
|
|
||||||
*
|
|
||||||
* In the older versions of Citus, worker_hash_partition_table()'s 6th parameter
|
|
||||||
* was an integer which denoted the number of buckets to split the shard's data.
|
|
||||||
* In the later versions of Citus, the sixth parameter is changed to get an array
|
|
||||||
* of shard ranges, which is used as the ranges to split the shard's data.
|
|
||||||
*
|
|
||||||
* Keeping this value is important if the coordinator's Citus version is <= 7.3
|
|
||||||
* and worker Citus version is > 7.3.
|
|
||||||
*/
|
|
||||||
partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5);
|
|
||||||
if (partitionBucketOid == INT4ARRAYOID)
|
|
||||||
{
|
|
||||||
hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
|
|
||||||
|
|
||||||
hashRangeArray = DeconstructArrayObject(hashRangeObject);
|
|
||||||
partitionCount = ArrayObjectCount(hashRangeObject);
|
|
||||||
|
|
||||||
partitionContext->syntheticShardIntervalArray =
|
|
||||||
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
|
|
||||||
partitionContext->hasUniformHashDistribution =
|
|
||||||
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
|
|
||||||
partitionCount);
|
|
||||||
|
|
||||||
HashPartitionIdFunction = &HashPartitionId;
|
|
||||||
}
|
|
||||||
else if (partitionBucketOid == INT4OID)
|
|
||||||
{
|
|
||||||
partitionCount = PG_GETARG_UINT32(5);
|
|
||||||
|
|
||||||
partitionContext->syntheticShardIntervalArray =
|
|
||||||
GenerateSyntheticShardIntervalArray(partitionCount);
|
|
||||||
partitionContext->hasUniformHashDistribution = true;
|
|
||||||
|
|
||||||
HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* we should never get other type of parameters */
|
|
||||||
ereport(ERROR, (errmsg("unexpected parameter for "
|
|
||||||
"worker_hash_partition_table()")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* use column's type information to get the hashing function */
|
/* use column's type information to get the hashing function */
|
||||||
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC);
|
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC);
|
||||||
|
@ -278,7 +238,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* call the partitioning function that does the actual work */
|
/* call the partitioning function that does the actual work */
|
||||||
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType,
|
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType,
|
||||||
HashPartitionIdFunction, (const void *) partitionContext,
|
hashPartitionIdFunction, (const void *) partitionContext,
|
||||||
partitionFileArray, fileCount);
|
partitionFileArray, fileCount);
|
||||||
|
|
||||||
/* close partition files and atomically rename (commit) them */
|
/* close partition files and atomically rename (commit) them */
|
||||||
|
@ -1318,37 +1278,3 @@ HashPartitionId(Datum partitionValue, const void *context)
|
||||||
|
|
||||||
return hashPartitionId;
|
return hashPartitionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility
|
|
||||||
* between the Citus versions 7.4 and older versions.
|
|
||||||
*
|
|
||||||
* HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value
|
|
||||||
* using hash partitioning. More specifically, the function returns zero if the
|
|
||||||
* given data value is null. If not, the function applies the standard Postgres
|
|
||||||
* hashing function for the given data type, and mods the hashed result with the
|
|
||||||
* number of partitions. The function then returns the modded number as the
|
|
||||||
* partition number.
|
|
||||||
*
|
|
||||||
* Note that any changes to PostgreSQL's hashing functions will reshuffle the
|
|
||||||
* entire distribution created by this function. For a discussion of this issue,
|
|
||||||
* see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4."
|
|
||||||
*/
|
|
||||||
static uint32
|
|
||||||
HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context)
|
|
||||||
{
|
|
||||||
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
|
|
||||||
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;
|
|
||||||
uint32 partitionCount = hashPartitionContext->partitionCount;
|
|
||||||
Datum hashDatum = 0;
|
|
||||||
uint32 hashResult = 0;
|
|
||||||
uint32 hashPartitionId = 0;
|
|
||||||
|
|
||||||
/* hash functions return unsigned 32-bit integers */
|
|
||||||
hashDatum = FunctionCall1(hashFunction, partitionValue);
|
|
||||||
hashResult = DatumGetUInt32(hashDatum);
|
|
||||||
hashPartitionId = (hashResult % partitionCount);
|
|
||||||
|
|
||||||
return hashPartitionId;
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue