Implement partitioned intermediate results.

pull/3329/head
Hadi Moshayedi 2019-12-19 13:42:19 -08:00
parent 1aef63abfb
commit d7aea7fa10
16 changed files with 1601 additions and 70 deletions

View File

@ -92,7 +92,6 @@ static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid sourceRelationId);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureTableNotDistributed(Oid relationId);
static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
@ -936,7 +935,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
*
* The passed in oid has to belong to a value of citus.distribution_type.
*/
static char
char
LookupDistributionMethod(Oid distributionMethodOid)
{
char distributionMethod = 0;

View File

@ -90,9 +90,7 @@ static void SendCopyDataOverConnection(StringInfo dataBuffer,
static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver);
static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
static char * CreateIntermediateResultsDirectory(void);
static char * IntermediateResultsDirectory(void);
static char * QueryResultFileName(const char *resultId);
static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
char *copyFormat,
Datum *resultIdArray,
@ -556,7 +554,7 @@ ReceiveQueryResultViaCopy(const char *resultId)
* directory for the current transaction if it does not exist and ensures
* that the directory is removed at the end of the transaction.
*/
static char *
char *
CreateIntermediateResultsDirectory(void)
{
char *resultDirectory = IntermediateResultsDirectory();
@ -591,7 +589,7 @@ CreateIntermediateResultsDirectory(void)
* an intermediate result with the given key in the per transaction
* result directory.
*/
static char *
char *
QueryResultFileName(const char *resultId)
{
StringInfo resultFileName = makeStringInfo();

View File

@ -0,0 +1,519 @@
/*-------------------------------------------------------------------------
*
* partition_intermediate_results.c
* Functions for writing partitioned intermediate results.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "port.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_type.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
#include "nodes/primnodes.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/typcache.h"
/*
* PartitionedResultDestReceiver is used for streaming tuples into a set of
* partitioned result files.
*/
typedef struct PartitionedResultDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
/* partition file $i is stored at file named $resultIdPrefix_$i. */
char *resultIdPrefix;
/* use binary copy or just text copy format? */
bool binaryCopy;
/* used for deciding which partition a shard belongs to. */
DistTableCacheEntry *shardSearchInfo;
MemoryContext perTupleContext;
/* how does stream tuples look like? */
TupleDesc tupleDescriptor;
/* which column of streamed tuples to use as partition column? */
int partitionColumnIndex;
/* how many partitions do we have? */
int partitionCount;
/*
* Tuples for partition[i] are sent to partitionDestReceivers[i], which
* writes it to a result file.
*/
DestReceiver **partitionDestReceivers;
} PartitionedResultDestReceiver;
static Portal StartPortalForQueryExecution(const char *queryString);
static DistTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray,
ArrayType *maxValuesArray,
char partitionMethod,
Var *partitionColumn);
static PartitionedResultDestReceiver * CreatePartitionedResultDestReceiver(char *resultId,
int
partitionColumnIndex,
int
partitionCount,
TupleDesc
tupleDescriptor,
bool binaryCopy,
DistTableCacheEntry
*
shardSearchInfo,
MemoryContext
perTupleContext);
static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *dest);
static void PartitionedResultDestReceiverShutdown(DestReceiver *destReceiver);
static void PartitionedResultDestReceiverDestroy(DestReceiver *destReceiver);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_partition_query_result);
/*
* worker_partition_query_result executes a query and writes the results into a
* set of local files according to the partition scheme and the partition column.
*/
Datum
worker_partition_query_result(PG_FUNCTION_ARGS)
{
ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo;
text *resultIdPrefixText = PG_GETARG_TEXT_P(0);
char *resultIdPrefixString = text_to_cstring(resultIdPrefixText);
/* verify that resultIdPrefix doesn't contain invalid characters */
QueryResultFileName(resultIdPrefixString);
text *queryText = PG_GETARG_TEXT_P(1);
char *queryString = text_to_cstring(queryText);
int partitionColumnIndex = PG_GETARG_INT32(2);
Oid partitionMethodOid = PG_GETARG_OID(3);
char partitionMethod = LookupDistributionMethod(partitionMethodOid);
if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_RANGE)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("only hash and range partitiong schemes are supported")));
}
ArrayType *minValuesArray = PG_GETARG_ARRAYTYPE_P(4);
int32 minValuesCount = ArrayObjectCount(minValuesArray);
ArrayType *maxValuesArray = PG_GETARG_ARRAYTYPE_P(5);
int32 maxValuesCount = ArrayObjectCount(maxValuesArray);
bool binaryCopy = PG_GETARG_BOOL(6);
CheckCitusVersion(ERROR);
if (!IsMultiStatementTransaction())
{
ereport(ERROR, (errmsg("worker_partition_query_result can only be used in a "
"transaction block")));
}
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
UseCoordinatedTransaction();
CreateIntermediateResultsDirectory();
if (minValuesCount != maxValuesCount)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"min values and max values must have the same number of elements")));
}
int partitionCount = minValuesCount;
if (partitionCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("number of partitions cannot be 0")));
}
/* start execution early in order to extract the tuple descriptor */
Portal portal = StartPortalForQueryExecution(queryString);
/* extract the partition column */
TupleDesc tupleDescriptor = portal->tupDesc;
if (tupleDescriptor == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("query must generate a set of rows")));
}
if (partitionColumnIndex < 0 || partitionColumnIndex >= tupleDescriptor->natts)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("partition column index must be between 0 and %d",
tupleDescriptor->natts - 1)));
}
FormData_pg_attribute *partitionColumnAttr = TupleDescAttr(tupleDescriptor,
partitionColumnIndex);
Var *partitionColumn = makeVar(partitionColumnIndex, partitionColumnIndex,
partitionColumnAttr->atttypid,
partitionColumnAttr->atttypmod,
partitionColumnAttr->attcollation, 0);
/* construct an artificial DistTableCacheEntry for shard pruning */
DistTableCacheEntry *shardSearchInfo =
QueryTupleShardSearchInfo(minValuesArray, maxValuesArray,
partitionMethod, partitionColumn);
/* prepare the output destination */
EState *estate = CreateExecutorState();
MemoryContext tupleContext = GetPerTupleMemoryContext(estate);
PartitionedResultDestReceiver *dest =
CreatePartitionedResultDestReceiver(resultIdPrefixString, partitionColumnIndex,
partitionCount, tupleDescriptor, binaryCopy,
shardSearchInfo, tupleContext);
/* execute the query */
PortalRun(portal, FETCH_ALL, false, true, (DestReceiver *) dest,
(DestReceiver *) dest, NULL);
/* construct the output result */
TupleDesc returnTupleDesc = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &returnTupleDesc);
resultInfo->returnMode = SFRM_Materialize;
resultInfo->setResult = tupleStore;
resultInfo->setDesc = returnTupleDesc;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
uint64 recordsWritten = 0;
uint64 bytesWritten = 0;
Datum values[3];
bool nulls[3];
if (dest->partitionDestReceivers[partitionIndex] != NULL)
{
FileDestReceiverStats(dest->partitionDestReceivers[partitionIndex],
&recordsWritten, &bytesWritten);
}
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(partitionIndex);
values[1] = UInt64GetDatum(recordsWritten);
values[2] = UInt64GetDatum(bytesWritten);
tuplestore_putvalues(tupleStore, returnTupleDesc, values, nulls);
}
tuplestore_donestoring(tupleStore);
PortalDrop(portal, false);
FreeExecutorState(estate);
PG_RETURN_INT64(1);
}
/*
* StartPortalForQueryExecution creates and starts a portal which can be
* used for running the given query.
*/
static Portal
StartPortalForQueryExecution(const char *queryString)
{
Query *query = ParseQueryString(queryString, NULL, 0);
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *queryPlan = pg_plan_query(query, cursorOptions, NULL);
Portal portal = CreateNewPortal();
/* don't display the portal in pg_cursors, it is for internal use only */
portal->visible = false;
PortalDefineQuery(portal, NULL, queryString, "SELECT", list_make1(queryPlan), NULL);
int eflags = 0;
PortalStart(portal, NULL, eflags, GetActiveSnapshot());
return portal;
}
/*
* QueryTupleShardSearchInfo returns a DistTableCacheEntry which has enough
* information so that FindShardInterval() can find the shard corresponding
* to a tuple.
*/
static DistTableCacheEntry *
QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
char partitionMethod, Var *partitionColumn)
{
Datum *minValues = 0;
Datum *maxValues = 0;
bool *minValueNulls = 0;
bool *maxValueNulls = 0;
int minValuesCount = 0;
int maxValuesCount = 0;
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = 0;
deconstruct_array(minValuesArray, TEXTOID, -1, false, 'i', &minValues,
&minValueNulls, &minValuesCount);
deconstruct_array(maxValuesArray, TEXTOID, -1, false, 'i', &maxValues,
&maxValueNulls, &maxValuesCount);
int partitionCount = minValuesCount;
Assert(maxValuesCount == partitionCount);
GetIntervalTypeInfo(partitionMethod, partitionColumn,
&intervalTypeId, &intervalTypeMod);
FmgrInfo *shardColumnCompare = GetFunctionInfo(partitionColumn->vartype,
BTREE_AM_OID, BTORDER_PROC);
FmgrInfo *shardIntervalCompare = GetFunctionInfo(intervalTypeId,
BTREE_AM_OID, BTORDER_PROC);
FmgrInfo *hashFunction = NULL;
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
TypeCacheEntry *typeEntry = lookup_type_cache(partitionColumn->vartype,
TYPECACHE_HASH_PROC_FINFO);
hashFunction = palloc0(sizeof(FmgrInfo));
fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CurrentMemoryContext);
}
ShardInterval **shardIntervalArray = palloc0(partitionCount *
sizeof(ShardInterval *));
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
Datum datumArray[Natts_pg_dist_shard] = {
[Anum_pg_dist_shard_logicalrelid - 1] = InvalidOid,
[Anum_pg_dist_shard_shardid - 1] = partitionIndex,
[Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL,
[Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex],
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex]
};
bool nullsArray[Natts_pg_dist_shard] = {
[Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex],
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex]
};
shardIntervalArray[partitionIndex] =
DeformedDistShardTupleToShardInterval(datumArray, nullsArray,
intervalTypeId, intervalTypeMod);
shardIntervalArray[partitionIndex]->shardIndex = partitionIndex;
}
DistTableCacheEntry *result = palloc0(sizeof(DistTableCacheEntry));
result->partitionMethod = partitionMethod;
result->partitionColumn = partitionColumn;
result->shardIntervalCompareFunction = shardIntervalCompare;
result->shardColumnCompareFunction = shardColumnCompare;
result->hashFunction = hashFunction;
result->sortedShardIntervalArray =
SortShardIntervalArray(shardIntervalArray, partitionCount,
partitionColumn->varcollid, shardIntervalCompare);
result->hasUninitializedShardInterval =
HasUninitializedShardInterval(result->sortedShardIntervalArray, partitionCount);
result->hasOverlappingShardInterval =
result->hasUninitializedShardInterval ||
HasOverlappingShardInterval(result->sortedShardIntervalArray, partitionCount,
partitionColumn->varcollid, shardIntervalCompare);
ErrorIfInconsistentShardIntervals(result);
result->shardIntervalArrayLength = partitionCount;
return result;
}
/*
* CreatePartitionedResultDestReceiver sets up a partitioned dest receiver.
*/
static PartitionedResultDestReceiver *
CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnIndex,
int partitionCount, TupleDesc tupleDescriptor,
bool binaryCopy, DistTableCacheEntry *shardSearchInfo,
MemoryContext perTupleContext)
{
PartitionedResultDestReceiver *resultDest =
palloc0(sizeof(PartitionedResultDestReceiver));
/* set up the DestReceiver function pointers */
resultDest->pub.receiveSlot = PartitionedResultDestReceiverReceive;
resultDest->pub.rStartup = PartitionedResultDestReceiverStartup;
resultDest->pub.rShutdown = PartitionedResultDestReceiverShutdown;
resultDest->pub.rDestroy = PartitionedResultDestReceiverDestroy;
resultDest->pub.mydest = DestCopyOut;
/* set up output parameters */
resultDest->resultIdPrefix = resultIdPrefix;
resultDest->perTupleContext = perTupleContext;
resultDest->partitionColumnIndex = partitionColumnIndex;
resultDest->partitionCount = partitionCount;
resultDest->shardSearchInfo = shardSearchInfo;
resultDest->tupleDescriptor = tupleDescriptor;
resultDest->binaryCopy = binaryCopy;
resultDest->partitionDestReceivers =
(DestReceiver **) palloc0(partitionCount * sizeof(DestReceiver *));
return resultDest;
}
/*
* PartitionedResultDestReceiverStartup implements the rStartup interface of
* PartitionedResultDestReceiver.
*/
static void
PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation,
TupleDesc inputTupleDescriptor)
{
/*
* We don't expect this to be called multiple times, but if it happens,
* we will just overwrite previous files.
*/
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor);
}
}
}
/*
* PartitionedResultDestReceiverReceive implements the receiveSlot interface of
* PartitionedResultDestReceiver.
*/
static bool
PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
slot_getallattrs(slot);
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
if (columnNulls[partitionedDest->partitionColumnIndex])
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column value cannot be NULL")));
}
Datum partitionColumnValue = columnValues[partitionedDest->partitionColumnIndex];
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue,
partitionedDest->shardSearchInfo);
if (shardInterval == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find shard for partition column "
"value")));
}
int partitionIndex = shardInterval->shardIndex;
DestReceiver *partitionDest = partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest == NULL)
{
StringInfo resultId = makeStringInfo();
appendStringInfo(resultId, "%s_%d", partitionedDest->resultIdPrefix,
partitionIndex);
char *filePath = QueryResultFileName(resultId->data);
partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext,
partitionedDest->binaryCopy);
partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest;
partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor);
}
partitionDest->receiveSlot(slot, partitionDest);
return true;
}
/*
* PartitionedResultDestReceiverShutdown implements the rShutdown interface of
* PartitionedResultDestReceiver.
*/
static void
PartitionedResultDestReceiverShutdown(DestReceiver *copyDest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
partitionDest->rShutdown(partitionDest);
}
}
}
/*
* PartitionedResultDestReceiverDestroy implements the rDestroy interface of
* PartitionedResultDestReceiver.
*/
static void
PartitionedResultDestReceiverDestroy(DestReceiver *copyDest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
/* this call should also free partitionDest, so no need to free it after */
partitionDest->rDestroy(partitionDest);
}
}
pfree(partitionedDest->partitionDestReceivers);
pfree(partitionedDest);
}

View File

@ -194,15 +194,9 @@ static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
static void PrepareWorkerNodeCache(void);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static bool CheckInstalledVersion(int elevel);
static char * AvailableExtensionVersion(void);
static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
Oid shardIntervalCollation,
FmgrInfo *shardIntervalSortCompareFunction);
static bool CitusHasBeenLoadedInternal(void);
static void InitializeCaches(void);
static void InitializeDistCache(void);
@ -1245,22 +1239,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->hasOverlappingShardInterval = true;
}
/*
* If table is hash-partitioned and has shards, there never should be
* any uninitalized shards. Historically we've not prevented that for
* range partitioned tables, but it might be a good idea to start
* doing so.
*/
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards")));
}
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasOverlappingShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has overlapping shards")));
}
ErrorIfInconsistentShardIntervals(cacheEntry);
}
/*
@ -1330,6 +1309,31 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
}
/*
* ErrorIfInconsistentShardIntervals checks if shard intervals are consistent with
* our expectations.
*/
void
ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry)
{
/*
* If table is hash-partitioned and has shards, there never should be any
* uninitalized shards. Historically we've not prevented that for range
* partitioned tables, but it might be a good idea to start doing so.
*/
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards")));
}
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasOverlappingShardInterval)
{
ereport(ERROR, (errmsg("hash partitioned table has overlapping shards")));
}
}
/*
* HasUniformHashDistribution determines whether the given list of sorted shards
* has a uniform hash distribution, as produced by master_create_worker_shards for
@ -1376,7 +1380,7 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray,
* ensure that input shard interval array is sorted on shardminvalue and uninitialized
* shard intervals are at the end of the array.
*/
static bool
bool
HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount)
{
bool hasUninitializedShardInterval = false;
@ -1406,7 +1410,7 @@ HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shar
* HasOverlappingShardInterval determines whether the given list of sorted
* shards has overlapping ranges.
*/
static bool
bool
HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
Oid shardIntervalCollation,
@ -3682,8 +3686,9 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Var *partitionColumn = (Var *) partitionNode;
Assert(IsA(partitionNode, Var));
*intervalTypeId = partitionColumn->vartype;
*intervalTypeMod = partitionColumn->vartypmod;
GetIntervalTypeInfo(partitionMethod, partitionColumn,
intervalTypeId, intervalTypeMod);
*columnTypeId = partitionColumn->vartype;
*columnTypeMod = partitionColumn->vartypmod;
break;
@ -3695,7 +3700,9 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Var *partitionColumn = (Var *) partitionNode;
Assert(IsA(partitionNode, Var));
*intervalTypeId = INT4OID;
GetIntervalTypeInfo(partitionMethod, partitionColumn,
intervalTypeId, intervalTypeMod);
*columnTypeId = partitionColumn->vartype;
*columnTypeMod = partitionColumn->vartypmod;
break;
@ -3716,6 +3723,42 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
}
/*
* GetIntervalTypeInfo gets type id and type mod of the min/max values
* of shard intervals for a distributed table with given partition method
* and partition column.
*/
void
GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
Oid *intervalTypeId, int32 *intervalTypeMod)
{
*intervalTypeId = InvalidOid;
*intervalTypeMod = -1;
switch (partitionMethod)
{
case DISTRIBUTE_BY_APPEND:
case DISTRIBUTE_BY_RANGE:
{
*intervalTypeId = partitionColumn->vartype;
*intervalTypeMod = partitionColumn->vartypmod;
break;
}
case DISTRIBUTE_BY_HASH:
{
*intervalTypeId = INT4OID;
break;
}
default:
{
break;
}
}
}
/*
* TupleToShardInterval transforms the specified dist_shard tuple into a new
* ShardInterval using the provided descriptor and partition type information.
@ -3725,10 +3768,33 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid
intervalTypeId,
int32 intervalTypeMod)
{
Oid inputFunctionId = InvalidOid;
Oid typeIoParam = InvalidOid;
Datum datumArray[Natts_pg_dist_shard];
bool isNullArray[Natts_pg_dist_shard];
/*
* We use heap_deform_tuple() instead of heap_getattr() to expand tuple
* to contain missing values when ALTER TABLE ADD COLUMN happens.
*/
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
ShardInterval *shardInterval =
DeformedDistShardTupleToShardInterval(datumArray, isNullArray,
intervalTypeId, intervalTypeMod);
return shardInterval;
}
/*
* DeformedDistShardTupleToShardInterval transforms the specified deformed
* pg_dist_shard tuple into a new ShardInterval.
*/
ShardInterval *
DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
Oid intervalTypeId, int32 intervalTypeMod)
{
Oid inputFunctionId = InvalidOid;
Oid typeIoParam = InvalidOid;
Datum minValue = 0;
Datum maxValue = 0;
bool minValueExists = false;
@ -3738,14 +3804,8 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid
char intervalAlign = '0';
char intervalDelim = '0';
/*
* We use heap_deform_tuple() instead of heap_getattr() to expand tuple
* to contain missing values when ALTER TABLE ADD COLUMN happens.
*/
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
Oid relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid -
1]);
Oid relationId =
DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]);
int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]);
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];

View File

@ -1,5 +1,6 @@
#include "udfs/read_intermediate_results/9.2-1.sql"
#include "udfs/fetch_intermediate_results/9.2-1.sql"
#include "udfs/worker_partition_query_result/9.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid;
UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation

View File

@ -0,0 +1,16 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result(
result_prefix text,
query text,
partition_column_index int,
partition_method citus.distribution_type,
partition_min_values text[],
partition_max_values text[],
binaryCopy boolean,
OUT partition_index int,
OUT rows_written bigint,
OUT bytes_written bigint)
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$worker_partition_query_result$$;
COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean)
IS 'execute a query and partitions its results in set of local result files';

View File

@ -0,0 +1,16 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result(
result_prefix text,
query text,
partition_column_index int,
partition_method citus.distribution_type,
partition_min_values text[],
partition_max_values text[],
binaryCopy boolean,
OUT partition_index int,
OUT rows_written bigint,
OUT bytes_written bigint)
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$worker_partition_query_result$$;
COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean)
IS 'execute a query and partitions its results in set of local result files';

View File

@ -24,6 +24,8 @@
/* necessary to get S_IRUSR, S_IWUSR definitions on illumos */
#include <sys/stat.h>
#define COPY_BUFFER_SIZE (4 * 1024 * 1024)
/* TaskFileDestReceiver can be used to stream results into a file */
typedef struct TaskFileDestReceiver
{
@ -33,8 +35,8 @@ typedef struct TaskFileDestReceiver
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* EState for per-tuple memory allocation */
EState *executorState;
/* context for per-tuple memory allocation */
MemoryContext tupleContext;
/* MemoryContext for DestReceiver session */
MemoryContext memoryContext;
@ -48,13 +50,12 @@ typedef struct TaskFileDestReceiver
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;
/* number of tuples sent */
/* statistics */
uint64 tuplesSent;
uint64 bytesSent;
} TaskFileDestReceiver;
static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState,
bool binaryCopyFormat);
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
@ -102,9 +103,10 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
ParamListInfo paramListInfo = NULL;
EState *estate = CreateExecutorState();
MemoryContext tupleContext = GetPerTupleMemoryContext(estate);
TaskFileDestReceiver *taskFileDest =
(TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate,
binaryCopyFormat);
(TaskFileDestReceiver *) CreateFileDestReceiver(taskFilename, tupleContext,
binaryCopyFormat);
ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest);
@ -118,11 +120,11 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
/*
* CreateTaskFileDestReceiver creates a DestReceiver for writing query results
* to a task file.
* CreateFileDestReceiver creates a DestReceiver for writing query results
* to a file.
*/
static DestReceiver *
CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat)
DestReceiver *
CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCopyFormat)
{
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) palloc0(
sizeof(TaskFileDestReceiver));
@ -135,7 +137,7 @@ CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCop
taskFileDest->pub.mydest = DestCopyOut;
/* set up output parameters */
taskFileDest->executorState = executorState;
taskFileDest->tupleContext = tupleContext;
taskFileDest->memoryContext = CurrentMemoryContext;
taskFileDest->filePath = pstrdup(filePath);
taskFileDest->binaryCopyFormat = binaryCopyFormat;
@ -173,7 +175,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = taskFileDest->binaryCopyFormat;
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState);
copyOutState->rowcontext = taskFileDest->tupleContext;
taskFileDest->copyOutState = copyOutState;
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
@ -187,10 +189,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
if (copyOutState->binary)
{
/* write headers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
}
MemoryContextSwitchTo(oldContext);
@ -214,8 +213,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
StringInfo copyData = copyOutState->fe_msgbuf;
EState *executorState = taskFileDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext executorTupleContext = taskFileDest->tupleContext;
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
slot_getallattrs(slot);
@ -223,19 +221,21 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
resetStringInfo(copyData);
/* construct row in COPY format */
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, NULL);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
if (copyData->len > COPY_BUFFER_SIZE)
{
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
resetStringInfo(copyData);
}
MemoryContextSwitchTo(oldContext);
taskFileDest->tuplesSent++;
ResetPerTupleExprContext(executorState);
MemoryContextReset(executorTupleContext);
return true;
}
@ -254,6 +254,8 @@ WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest)
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
taskFileDest->bytesSent += bytesWritten;
}
@ -268,12 +270,18 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
CopyOutState copyOutState = taskFileDest->copyOutState;
if (copyOutState->fe_msgbuf->len > 0)
{
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
resetStringInfo(copyOutState->fe_msgbuf);
}
if (copyOutState->binary)
{
/* write footers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
resetStringInfo(copyOutState->fe_msgbuf);
}
FileClose(taskFileDest->fileCompat.fd);
@ -302,3 +310,15 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
pfree(taskFileDest->filePath);
pfree(taskFileDest);
}
/*
* FileDestReceiverStats returns statistics for the given file dest receiver.
*/
void
FileDestReceiverStats(DestReceiver *dest, uint64 *rowsSent, uint64 *bytesSent)
{
TaskFileDestReceiver *fileDestReceiver = (TaskFileDestReceiver *) dest;
*rowsSent = fileDestReceiver->tuplesSent;
*bytesSent = fileDestReceiver->bytesSent;
}

View File

@ -29,6 +29,8 @@ extern void SendQueryResultViaCopy(const char *resultId);
extern void ReceiveQueryResultViaCopy(const char *resultId);
extern void RemoveIntermediateResultsDirectory(void);
extern int64 IntermediateResultSize(char *resultId);
extern char * QueryResultFileName(const char *resultId);
extern char * CreateIntermediateResultsDirectory(void);
#endif /* INTERMEDIATE_RESULTS_H */

View File

@ -155,6 +155,11 @@ extern Datum StringToDatum(char *inputString, Oid dataType);
extern char * DatumToString(Datum datum, Oid dataType);
extern int CompareShardPlacementsByWorker(const void *leftElement,
const void *rightElement);
extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray,
bool *isNullArray,
Oid intervalTypeId,
int32 intervalTypeMod);
extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn,
Oid *intervalTypeId, int32 *intervalTypeMod);
#endif /* MASTER_METADATA_UTILITY_H */

View File

@ -140,13 +140,20 @@ extern void InvalidateMetadataSystemCache(void);
extern Datum DistNodeMetadata(void);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength,
Oid shardIntervalCollation,
FmgrInfo *shardIntervalSortCompareFunction);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
extern bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
extern void ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry);
extern void EnsureModificationsCanRun(void);
extern char LookupDistributionMethod(Oid distributionMethodOid);
/* access WorkerNodeHash */
extern HTAB * GetWorkerNodeHash(void);

View File

@ -58,6 +58,7 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
#define SHARD_STORAGE_FOREIGN 'f'
#define SHARD_STORAGE_TABLE 't'
#define SHARD_STORAGE_COLUMNAR 'c'
#define SHARD_STORAGE_VIRTUAL 'v'
#endif /* PG_DIST_SHARD_H */

View File

@ -136,6 +136,12 @@ extern StringInfo UserTaskFilename(StringInfo directoryName, uint32 taskId);
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList);
extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);
extern DestReceiver * CreateFileDestReceiver(char *filePath,
MemoryContext tupleContext,
bool binaryCopyFormat);
extern void FileDestReceiverStats(DestReceiver *dest,
uint64 *rowsSent,
uint64 *bytesSent);
/* Function declaration for parsing tree node */
extern Node * ParseTreeNode(const char *ddlCommand);

View File

@ -0,0 +1,513 @@
-- Test functions for partitioning intermediate results
CREATE SCHEMA partitioned_intermediate_results;
SET search_path TO 'partitioned_intermediate_results';
-- hash partitioned intermediate results
BEGIN;
SELECT * FROM worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i', 0, 'hash',
'{-2147483648,-1073741824,0,1073741824}'::text[],
'{-1073741825,-1,1073741823,2147483647}'::text[], false);
partition_index | rows_written | bytes_written
-----------------+--------------+---------------
0 | 4 | 21
1 | 3 | 14
2 | 1 | 5
3 | 2 | 9
(4 rows)
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_0', 'text') AS res (x int, x2 int)
ORDER BY x;
hashint4 | x | x2
-------------+----+-----
-1905060026 | 1 | 1
-1330264708 | 5 | 25
-2047600124 | 8 | 64
-1547814713 | 10 | 100
(4 rows)
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_1', 'text') AS res (x int, x2 int)
ORDER BY x;
hashint4 | x | x2
-------------+---+----
-28094569 | 3 | 9
-1011077333 | 4 | 16
-978793473 | 7 | 49
(3 rows)
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_2', 'text') AS res (x int, x2 int)
ORDER BY x;
hashint4 | x | x2
-----------+---+----
566031088 | 6 | 36
(1 row)
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_3', 'text') AS res (x int, x2 int)
ORDER BY x;
hashint4 | x | x2
------------+---+----
1134484726 | 2 | 4
1672378334 | 9 | 81
(2 rows)
END;
-- range partitioned intermediate results
BEGIN;
SELECT * FROM worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, /* partition by x^2 */
'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true /* binary format */);
partition_index | rows_written | bytes_written
-----------------+--------------+---------------
0 | 4 | 93
1 | 2 | 57
2 | 1 | 39
3 | 3 | 75
(4 rows)
SELECT x, x2 FROM
read_intermediate_result('squares_range_0', 'binary') AS res (x int, x2 int)
ORDER BY x;
x | x2
---+----
1 | 1
2 | 4
3 | 9
4 | 16
(4 rows)
SELECT x, x2 FROM
read_intermediate_result('squares_range_1', 'binary') AS res (x int, x2 int)
ORDER BY x;
x | x2
---+----
5 | 25
6 | 36
(2 rows)
SELECT x, x2 FROM
read_intermediate_result('squares_range_2', 'binary') AS res (x int, x2 int)
ORDER BY x;
x | x2
---+----
7 | 49
(1 row)
SELECT x, x2 FROM
read_intermediate_result('squares_range_3', 'binary') AS res (x int, x2 int)
ORDER BY x;
x | x2
----+-----
8 | 64
9 | 81
10 | 100
(3 rows)
END;
-- 1M rows, just in case. text format.
BEGIN;
SELECT * FROM worker_partition_query_result('doubles_hash',
'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'hash',
'{-2147483648,-1073741824,0,1073741824}'::text[],
'{-1073741825,-1,1073741823,2147483647}'::text[], false);
partition_index | rows_written | bytes_written
-----------------+--------------+---------------
0 | 250199 | 3586179
1 | 249872 | 3581280
2 | 250278 | 3587487
3 | 249651 | 3578401
(4 rows)
SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_hash_0',
'doubles_hash_1',
'doubles_hash_2',
'doubles_hash_3'], 'text') AS res (x int, x2 int);
count
---------
1000000
(1 row)
END;
-- 1M rows, just in case. binary format.
BEGIN;
SELECT * FROM worker_partition_query_result('doubles_range',
'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'range',
'{0,250001,500001,750001}'::text[],
'{250000,500000,750000,1000000}'::text[], true);
partition_index | rows_written | bytes_written
-----------------+--------------+---------------
0 | 250000 | 4500021
1 | 250000 | 4500021
2 | 250000 | 4500021
3 | 250000 | 4500021
(4 rows)
SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_range_0',
'doubles_range_1',
'doubles_range_2',
'doubles_range_3'], 'binary') AS res (x int, x2 int);
count
---------
1000000
(1 row)
END;
--
-- Some error cases
--
-- not allowed outside transaction block
SELECT * FROM worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range', '{0}'::text[], '{20}'::text[], true);
ERROR: worker_partition_query_result can only be used in a transaction block
BEGIN;
SAVEPOINT s1;
-- syntax error in query
SELECT worker_partition_query_result('squares_range',
'SELECxT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: syntax error at or near "SELECxT"
LINE 1: SELECT worker_partition_query_result('squares_range',
^
ROLLBACK TO SAVEPOINT s1;
-- invalid result prefix
SELECT worker_partition_query_result('squares_range/a/',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: result key "squares_range/a/" contains invalid character
HINT: Result keys may only contain letters, numbers, underscores and hyphens.
ROLLBACK TO SAVEPOINT s1;
-- empty min/max values
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range', ARRAY[]::text[], ARRAY[]::text[], true);
ERROR: number of partitions cannot be 0
ROLLBACK TO SAVEPOINT s1;
-- append partitioning
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'append',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: only hash and range partitiong schemes are supported
ROLLBACK TO SAVEPOINT s1;
-- query with no results
CREATE TABLE t(a int);
SELECT worker_partition_query_result('squares_range',
'INSERT INTO t VALUES (1), (2)',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: query must generate a set of rows
ROLLBACK TO SAVEPOINT s1;
-- negative partition index
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
-1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: partition column index must be between 0 and 1
ROLLBACK TO SAVEPOINT s1;
-- too large partition index
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
2, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: partition column index must be between 0 and 1
ROLLBACK TO SAVEPOINT s1;
-- min/max values of different lengths
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61,101}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: min values and max values must have the same number of elements
ROLLBACK TO SAVEPOINT s1;
-- null values in min/max values of hash partitioned results
SELECT worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'hash',
'{NULL,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: hash partitioned table has uninitialized shards
ROLLBACK TO SAVEPOINT s1;
-- multiple queries
SELECT worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i; SELECT 4, 16;',
1, 'hash',
'{NULL,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ERROR: cannot execute multiple utility events
ROLLBACK TO SAVEPOINT s1;
ROLLBACK;
--
-- Procedure for conveniently testing worker_partition_query_result(). It uses
-- worker_partition_query_results to partition result of query using the same
-- scheme as the distributed table rel, and then compares if it did the partitioning
-- the same way as shards of rel.
--
CREATE OR REPLACE PROCEDURE test_partition_query_results(rel regclass, query text,
binaryCopy boolean DEFAULT true)
AS $$
DECLARE
partition_min_values text[];
partition_max_values text[];
partition_column_index int;
partition_method citus.distribution_type;
partitioned_results_row_counts text[];
distributed_table_row_counts text[];
tuple_def text;
partition_result_names text[];
non_empty_partitions int[];
rows_different int;
BEGIN
-- get tuple definition
SELECT string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ' ORDER BY a.attnum)
INTO tuple_def
FROM pg_catalog.pg_attribute a
WHERE a.attrelid = rel::oid AND a.attnum > 0 AND NOT a.attisdropped;
-- get min/max value arrays
SELECT array_agg(shardminvalue ORDER BY shardid),
array_agg(shardmaxvalue ORDER BY shardid)
INTO partition_min_values, partition_max_values
FROM pg_dist_shard
WHERE logicalrelid=rel;
-- get partition column index and partition method
SELECT (regexp_matches(partkey, ':varattno ([0-9]+)'))[1]::int - 1,
(CASE WHEN partmethod='h' THEN 'hash' ELSE 'range' END)
INTO partition_column_index, partition_method
FROM pg_dist_partition
WHERE logicalrelid=rel;
-- insert into the distributed table
EXECUTE 'INSERT INTO ' || rel::text || ' ' || query;
-- repartition the query locally
SELECT array_agg(rows_written::text ORDER BY partition_index),
array_agg(partition_index) FILTER (WHERE rows_written > 0)
INTO partitioned_results_row_counts,
non_empty_partitions
FROM worker_partition_query_result('test_prefix', query, partition_column_index,
partition_method, partition_min_values,
partition_max_values, binaryCopy);
SELECT array_agg('test_prefix_' || i::text)
INTO partition_result_names
FROM unnest(non_empty_partitions) i;
EXECUTE 'SELECT count(*) FROM ((' || query || ') EXCEPT (SELECT * FROM read_intermediate_results($1,$2) AS res (' || tuple_def || '))) t'
INTO rows_different
USING partition_result_names, (CASE WHEN binaryCopy THEN 'binary' ELSE 'text' END)::pg_catalog.citus_copy_format;
-- commit so results are available in run_command_on_shards
COMMIT;
-- rows per shard of the distributed table
SELECT array_agg(result order by shardid) INTO distributed_table_row_counts
FROM run_command_on_shards(rel, 'SELECT count(*) FROM %s');
IF partitioned_results_row_counts = distributed_table_row_counts THEN
RAISE NOTICE 'Rows per partition match ...';
ELSE
RAISE 'FAILED: rows per partition do not match, expecting % got %', distributed_table_row_counts, partitioned_results_row_counts;
END IF;
IF rows_different = 0 THEN
RAISE NOTICE 'Row values match ...';
ELSE
RAISE 'FAILED: Could not find % of expected rows in partitions', rows_different;
END IF;
RAISE NOTICE 'PASSED.';
END;
$$ LANGUAGE plpgsql;
--
-- Procedure for creating shards for range partitioned distributed table.
--
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
AS $$
DECLARE
new_shardid bigint;
idx int;
BEGIN
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
LOOP
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
END LOOP;
END;
$$ LANGUAGE plpgsql;
\set VERBOSITY terse
-- hash partitioning, 32 shards
SET citus.shard_count TO 32;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
create_distributed_table
--------------------------
(1 row)
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- hash partitioning, 1 shard
SET citus.shard_count TO 1;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
create_distributed_table
--------------------------
(1 row)
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- hash partitioning, 17 shards (so hash partitions aren't uniform)
SET citus.shard_count TO 17;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
create_distributed_table
--------------------------
(1 row)
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- hash partitioning, date partition column
SET citus.shard_count TO 8;
CREATE TABLE t(a DATE, b int);
SELECT create_distributed_table('t', 'a');
create_distributed_table
--------------------------
(1 row)
CALL test_partition_query_results('t', 'SELECT (''1985-05-18''::date + (x::text || '' days'')::interval)::date, x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- hash partitioning, int4 range partition column
SET citus.shard_count TO 8;
CREATE TABLE t(a int4range, b int);
SELECT create_distributed_table('t', 'a');
create_distributed_table
--------------------------
(1 row)
CALL test_partition_query_results('t', 'SELECT int4range(x,2*x+10), x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- range partitioning, int partition column
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
create_distributed_table
--------------------------
(1 row)
CALL create_range_partitioned_shards('t', '{0,25,50,76}',
'{24,49,75,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- not covering ranges, should ERROR
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
create_distributed_table
--------------------------
(1 row)
CALL create_range_partitioned_shards('t', '{0,25,50,100}',
'{24,49,75,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
ERROR: could not find shard for partition column value
DROP TABLE t;
-- overlapping ranges, we allow this in range partitioned distributed tables, should be fine
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
create_distributed_table
--------------------------
(1 row)
CALL create_range_partitioned_shards('t', '{0,25,50,76}',
'{50,49,90,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
-- range partitioning, composite partition column
CREATE TYPE composite_key_type AS (f1 int, f2 text);
SET citus.shard_count TO 8;
CREATE TABLE t(key composite_key_type, value int);
SELECT create_distributed_table('t', 'key', 'range');
create_distributed_table
--------------------------
(1 row)
CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}',
'{"(24,z)","(49,z)","(74,z)","(100,z)"}');
CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
DROP TYPE composite_key_type;
-- unsorted ranges
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
create_distributed_table
--------------------------
(1 row)
CALL create_range_partitioned_shards('t', '{50,25,76,0}',
'{75,49,200,24}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
NOTICE: Rows per partition match ...
NOTICE: Row values match ...
NOTICE: PASSED.
DROP TABLE t;
SET client_min_messages TO WARNING;
DROP SCHEMA partitioned_intermediate_results CASCADE;
\set VERBOSITY default
SET client_min_messages TO DEFAULT;
SET citus.shard_count TO DEFAULT;

View File

@ -70,7 +70,7 @@ test: subquery_prepared_statements pg12
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results limit_intermediate_size
test: multi_explain hyperscale_tutorial
test: multi_explain hyperscale_tutorial partitioned_intermediate_results
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure multi_function_in_join row_types materialized_view

View File

@ -0,0 +1,368 @@
-- Test functions for partitioning intermediate results
CREATE SCHEMA partitioned_intermediate_results;
SET search_path TO 'partitioned_intermediate_results';
-- hash partitioned intermediate results
BEGIN;
SELECT * FROM worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i', 0, 'hash',
'{-2147483648,-1073741824,0,1073741824}'::text[],
'{-1073741825,-1,1073741823,2147483647}'::text[], false);
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_0', 'text') AS res (x int, x2 int)
ORDER BY x;
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_1', 'text') AS res (x int, x2 int)
ORDER BY x;
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_2', 'text') AS res (x int, x2 int)
ORDER BY x;
SELECT hashint4(x), x, x2 FROM
read_intermediate_result('squares_hash_3', 'text') AS res (x int, x2 int)
ORDER BY x;
END;
-- range partitioned intermediate results
BEGIN;
SELECT * FROM worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, /* partition by x^2 */
'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true /* binary format */);
SELECT x, x2 FROM
read_intermediate_result('squares_range_0', 'binary') AS res (x int, x2 int)
ORDER BY x;
SELECT x, x2 FROM
read_intermediate_result('squares_range_1', 'binary') AS res (x int, x2 int)
ORDER BY x;
SELECT x, x2 FROM
read_intermediate_result('squares_range_2', 'binary') AS res (x int, x2 int)
ORDER BY x;
SELECT x, x2 FROM
read_intermediate_result('squares_range_3', 'binary') AS res (x int, x2 int)
ORDER BY x;
END;
-- 1M rows, just in case. text format.
BEGIN;
SELECT * FROM worker_partition_query_result('doubles_hash',
'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'hash',
'{-2147483648,-1073741824,0,1073741824}'::text[],
'{-1073741825,-1,1073741823,2147483647}'::text[], false);
SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_hash_0',
'doubles_hash_1',
'doubles_hash_2',
'doubles_hash_3'], 'text') AS res (x int, x2 int);
END;
-- 1M rows, just in case. binary format.
BEGIN;
SELECT * FROM worker_partition_query_result('doubles_range',
'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'range',
'{0,250001,500001,750001}'::text[],
'{250000,500000,750000,1000000}'::text[], true);
SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_range_0',
'doubles_range_1',
'doubles_range_2',
'doubles_range_3'], 'binary') AS res (x int, x2 int);
END;
--
-- Some error cases
--
-- not allowed outside transaction block
SELECT * FROM worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range', '{0}'::text[], '{20}'::text[], true);
BEGIN;
SAVEPOINT s1;
-- syntax error in query
SELECT worker_partition_query_result('squares_range',
'SELECxT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- invalid result prefix
SELECT worker_partition_query_result('squares_range/a/',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- empty min/max values
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range', ARRAY[]::text[], ARRAY[]::text[], true);
ROLLBACK TO SAVEPOINT s1;
-- append partitioning
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'append',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- query with no results
CREATE TABLE t(a int);
SELECT worker_partition_query_result('squares_range',
'INSERT INTO t VALUES (1), (2)',
1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- negative partition index
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
-1, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- too large partition index
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
2, 'range',
'{0,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- min/max values of different lengths
SELECT worker_partition_query_result('squares_range',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'range',
'{0,21,41,61,101}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- null values in min/max values of hash partitioned results
SELECT worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i',
1, 'hash',
'{NULL,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
-- multiple queries
SELECT worker_partition_query_result('squares_hash',
'SELECT i, i * i FROM generate_series(1, 10) i; SELECT 4, 16;',
1, 'hash',
'{NULL,21,41,61}'::text[],
'{20,40,60,100}'::text[],
true);
ROLLBACK TO SAVEPOINT s1;
ROLLBACK;
--
-- Procedure for conveniently testing worker_partition_query_result(). It uses
-- worker_partition_query_results to partition result of query using the same
-- scheme as the distributed table rel, and then compares if it did the partitioning
-- the same way as shards of rel.
--
CREATE OR REPLACE PROCEDURE test_partition_query_results(rel regclass, query text,
binaryCopy boolean DEFAULT true)
AS $$
DECLARE
partition_min_values text[];
partition_max_values text[];
partition_column_index int;
partition_method citus.distribution_type;
partitioned_results_row_counts text[];
distributed_table_row_counts text[];
tuple_def text;
partition_result_names text[];
non_empty_partitions int[];
rows_different int;
BEGIN
-- get tuple definition
SELECT string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ' ORDER BY a.attnum)
INTO tuple_def
FROM pg_catalog.pg_attribute a
WHERE a.attrelid = rel::oid AND a.attnum > 0 AND NOT a.attisdropped;
-- get min/max value arrays
SELECT array_agg(shardminvalue ORDER BY shardid),
array_agg(shardmaxvalue ORDER BY shardid)
INTO partition_min_values, partition_max_values
FROM pg_dist_shard
WHERE logicalrelid=rel;
-- get partition column index and partition method
SELECT (regexp_matches(partkey, ':varattno ([0-9]+)'))[1]::int - 1,
(CASE WHEN partmethod='h' THEN 'hash' ELSE 'range' END)
INTO partition_column_index, partition_method
FROM pg_dist_partition
WHERE logicalrelid=rel;
-- insert into the distributed table
EXECUTE 'INSERT INTO ' || rel::text || ' ' || query;
-- repartition the query locally
SELECT array_agg(rows_written::text ORDER BY partition_index),
array_agg(partition_index) FILTER (WHERE rows_written > 0)
INTO partitioned_results_row_counts,
non_empty_partitions
FROM worker_partition_query_result('test_prefix', query, partition_column_index,
partition_method, partition_min_values,
partition_max_values, binaryCopy);
SELECT array_agg('test_prefix_' || i::text)
INTO partition_result_names
FROM unnest(non_empty_partitions) i;
EXECUTE 'SELECT count(*) FROM ((' || query || ') EXCEPT (SELECT * FROM read_intermediate_results($1,$2) AS res (' || tuple_def || '))) t'
INTO rows_different
USING partition_result_names, (CASE WHEN binaryCopy THEN 'binary' ELSE 'text' END)::pg_catalog.citus_copy_format;
-- commit so results are available in run_command_on_shards
COMMIT;
-- rows per shard of the distributed table
SELECT array_agg(result order by shardid) INTO distributed_table_row_counts
FROM run_command_on_shards(rel, 'SELECT count(*) FROM %s');
IF partitioned_results_row_counts = distributed_table_row_counts THEN
RAISE NOTICE 'Rows per partition match ...';
ELSE
RAISE 'FAILED: rows per partition do not match, expecting % got %', distributed_table_row_counts, partitioned_results_row_counts;
END IF;
IF rows_different = 0 THEN
RAISE NOTICE 'Row values match ...';
ELSE
RAISE 'FAILED: Could not find % of expected rows in partitions', rows_different;
END IF;
RAISE NOTICE 'PASSED.';
END;
$$ LANGUAGE plpgsql;
--
-- Procedure for creating shards for range partitioned distributed table.
--
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
AS $$
DECLARE
new_shardid bigint;
idx int;
BEGIN
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
LOOP
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
END LOOP;
END;
$$ LANGUAGE plpgsql;
\set VERBOSITY terse
-- hash partitioning, 32 shards
SET citus.shard_count TO 32;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
DROP TABLE t;
-- hash partitioning, 1 shard
SET citus.shard_count TO 1;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
DROP TABLE t;
-- hash partitioning, 17 shards (so hash partitions aren't uniform)
SET citus.shard_count TO 17;
CREATE TABLE t(a int, b int);
SELECT create_distributed_table('t', 'a');
CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x');
DROP TABLE t;
-- hash partitioning, date partition column
SET citus.shard_count TO 8;
CREATE TABLE t(a DATE, b int);
SELECT create_distributed_table('t', 'a');
CALL test_partition_query_results('t', 'SELECT (''1985-05-18''::date + (x::text || '' days'')::interval)::date, x * x FROM generate_series(1, 100) x');
DROP TABLE t;
-- hash partitioning, int4 range partition column
SET citus.shard_count TO 8;
CREATE TABLE t(a int4range, b int);
SELECT create_distributed_table('t', 'a');
CALL test_partition_query_results('t', 'SELECT int4range(x,2*x+10), x * x FROM generate_series(1, 100) x');
DROP TABLE t;
-- range partitioning, int partition column
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
CALL create_range_partitioned_shards('t', '{0,25,50,76}',
'{24,49,75,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
DROP TABLE t;
-- not covering ranges, should ERROR
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
CALL create_range_partitioned_shards('t', '{0,25,50,100}',
'{24,49,75,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
DROP TABLE t;
-- overlapping ranges, we allow this in range partitioned distributed tables, should be fine
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
CALL create_range_partitioned_shards('t', '{0,25,50,76}',
'{50,49,90,200}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
DROP TABLE t;
-- range partitioning, composite partition column
CREATE TYPE composite_key_type AS (f1 int, f2 text);
SET citus.shard_count TO 8;
CREATE TABLE t(key composite_key_type, value int);
SELECT create_distributed_table('t', 'key', 'range');
CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}',
'{"(24,z)","(49,z)","(74,z)","(100,z)"}');
CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x');
DROP TABLE t;
DROP TYPE composite_key_type;
-- unsorted ranges
CREATE TABLE t(key int, value int);
SELECT create_distributed_table('t', 'key', 'range');
CALL create_range_partitioned_shards('t', '{50,25,76,0}',
'{75,49,200,24}');
CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x');
DROP TABLE t;
SET client_min_messages TO WARNING;
DROP SCHEMA partitioned_intermediate_results CASCADE;
\set VERBOSITY default
SET client_min_messages TO DEFAULT;
SET citus.shard_count TO DEFAULT;