diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index e81f3ef3d..adbf2e68f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -92,7 +92,6 @@ static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, Oid sourceRelationId); static void EnsureLocalTableEmpty(Oid relationId); static void EnsureTableNotDistributed(Oid relationId); -static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod, @@ -936,7 +935,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) * * The passed in oid has to belong to a value of citus.distribution_type. */ -static char +char LookupDistributionMethod(Oid distributionMethodOid) { char distributionMethod = 0; diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index b8d3c2c62..6dc0438ce 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -90,9 +90,7 @@ static void SendCopyDataOverConnection(StringInfo dataBuffer, static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver); static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver); -static char * CreateIntermediateResultsDirectory(void); static char * IntermediateResultsDirectory(void); -static char * QueryResultFileName(const char *resultId); static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, Datum *resultIdArray, @@ -556,7 +554,7 @@ ReceiveQueryResultViaCopy(const char *resultId) * directory for the current transaction if it does not exist and ensures * that the directory is removed at the end of the transaction. */ -static char * +char * CreateIntermediateResultsDirectory(void) { char *resultDirectory = IntermediateResultsDirectory(); @@ -591,7 +589,7 @@ CreateIntermediateResultsDirectory(void) * an intermediate result with the given key in the per transaction * result directory. */ -static char * +char * QueryResultFileName(const char *resultId) { StringInfo resultFileName = makeStringInfo(); diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c new file mode 100644 index 000000000..a111a68fb --- /dev/null +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -0,0 +1,519 @@ +/*------------------------------------------------------------------------- + * + * partition_intermediate_results.c + * Functions for writing partitioned intermediate results. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "port.h" + +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "catalog/pg_type.h" +#include "distributed/intermediate_results.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" +#include "distributed/worker_protocol.h" +#include "nodes/makefuncs.h" +#include "nodes/primnodes.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "utils/typcache.h" + + +/* + * PartitionedResultDestReceiver is used for streaming tuples into a set of + * partitioned result files. + */ +typedef struct PartitionedResultDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* partition file $i is stored at file named $resultIdPrefix_$i. */ + char *resultIdPrefix; + + /* use binary copy or just text copy format? */ + bool binaryCopy; + + /* used for deciding which partition a shard belongs to. */ + DistTableCacheEntry *shardSearchInfo; + + MemoryContext perTupleContext; + + /* how does stream tuples look like? */ + TupleDesc tupleDescriptor; + + /* which column of streamed tuples to use as partition column? */ + int partitionColumnIndex; + + /* how many partitions do we have? */ + int partitionCount; + + /* + * Tuples for partition[i] are sent to partitionDestReceivers[i], which + * writes it to a result file. + */ + DestReceiver **partitionDestReceivers; +} PartitionedResultDestReceiver; + +static Portal StartPortalForQueryExecution(const char *queryString); +static DistTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray, + ArrayType *maxValuesArray, + char partitionMethod, + Var *partitionColumn); +static PartitionedResultDestReceiver * CreatePartitionedResultDestReceiver(char *resultId, + int + partitionColumnIndex, + int + partitionCount, + TupleDesc + tupleDescriptor, + bool binaryCopy, + DistTableCacheEntry + * + shardSearchInfo, + MemoryContext + perTupleContext); +static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *dest); +static void PartitionedResultDestReceiverShutdown(DestReceiver *destReceiver); +static void PartitionedResultDestReceiverDestroy(DestReceiver *destReceiver); + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(worker_partition_query_result); + + +/* + * worker_partition_query_result executes a query and writes the results into a + * set of local files according to the partition scheme and the partition column. + */ +Datum +worker_partition_query_result(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; + + text *resultIdPrefixText = PG_GETARG_TEXT_P(0); + char *resultIdPrefixString = text_to_cstring(resultIdPrefixText); + + /* verify that resultIdPrefix doesn't contain invalid characters */ + QueryResultFileName(resultIdPrefixString); + + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + + int partitionColumnIndex = PG_GETARG_INT32(2); + Oid partitionMethodOid = PG_GETARG_OID(3); + + char partitionMethod = LookupDistributionMethod(partitionMethodOid); + if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_RANGE) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("only hash and range partitiong schemes are supported"))); + } + + ArrayType *minValuesArray = PG_GETARG_ARRAYTYPE_P(4); + int32 minValuesCount = ArrayObjectCount(minValuesArray); + + ArrayType *maxValuesArray = PG_GETARG_ARRAYTYPE_P(5); + int32 maxValuesCount = ArrayObjectCount(maxValuesArray); + + bool binaryCopy = PG_GETARG_BOOL(6); + + CheckCitusVersion(ERROR); + + if (!IsMultiStatementTransaction()) + { + ereport(ERROR, (errmsg("worker_partition_query_result can only be used in a " + "transaction block"))); + } + + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + CreateIntermediateResultsDirectory(); + + if (minValuesCount != maxValuesCount) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "min values and max values must have the same number of elements"))); + } + + int partitionCount = minValuesCount; + if (partitionCount == 0) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of partitions cannot be 0"))); + } + + /* start execution early in order to extract the tuple descriptor */ + Portal portal = StartPortalForQueryExecution(queryString); + + /* extract the partition column */ + TupleDesc tupleDescriptor = portal->tupDesc; + if (tupleDescriptor == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("query must generate a set of rows"))); + } + + if (partitionColumnIndex < 0 || partitionColumnIndex >= tupleDescriptor->natts) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("partition column index must be between 0 and %d", + tupleDescriptor->natts - 1))); + } + + FormData_pg_attribute *partitionColumnAttr = TupleDescAttr(tupleDescriptor, + partitionColumnIndex); + Var *partitionColumn = makeVar(partitionColumnIndex, partitionColumnIndex, + partitionColumnAttr->atttypid, + partitionColumnAttr->atttypmod, + partitionColumnAttr->attcollation, 0); + + /* construct an artificial DistTableCacheEntry for shard pruning */ + DistTableCacheEntry *shardSearchInfo = + QueryTupleShardSearchInfo(minValuesArray, maxValuesArray, + partitionMethod, partitionColumn); + + /* prepare the output destination */ + EState *estate = CreateExecutorState(); + MemoryContext tupleContext = GetPerTupleMemoryContext(estate); + PartitionedResultDestReceiver *dest = + CreatePartitionedResultDestReceiver(resultIdPrefixString, partitionColumnIndex, + partitionCount, tupleDescriptor, binaryCopy, + shardSearchInfo, tupleContext); + + /* execute the query */ + PortalRun(portal, FETCH_ALL, false, true, (DestReceiver *) dest, + (DestReceiver *) dest, NULL); + + /* construct the output result */ + TupleDesc returnTupleDesc = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &returnTupleDesc); + resultInfo->returnMode = SFRM_Materialize; + resultInfo->setResult = tupleStore; + resultInfo->setDesc = returnTupleDesc; + + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + uint64 recordsWritten = 0; + uint64 bytesWritten = 0; + Datum values[3]; + bool nulls[3]; + + if (dest->partitionDestReceivers[partitionIndex] != NULL) + { + FileDestReceiverStats(dest->partitionDestReceivers[partitionIndex], + &recordsWritten, &bytesWritten); + } + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(partitionIndex); + values[1] = UInt64GetDatum(recordsWritten); + values[2] = UInt64GetDatum(bytesWritten); + + tuplestore_putvalues(tupleStore, returnTupleDesc, values, nulls); + } + + tuplestore_donestoring(tupleStore); + PortalDrop(portal, false); + FreeExecutorState(estate); + + PG_RETURN_INT64(1); +} + + +/* + * StartPortalForQueryExecution creates and starts a portal which can be + * used for running the given query. + */ +static Portal +StartPortalForQueryExecution(const char *queryString) +{ + Query *query = ParseQueryString(queryString, NULL, 0); + + int cursorOptions = CURSOR_OPT_PARALLEL_OK; + PlannedStmt *queryPlan = pg_plan_query(query, cursorOptions, NULL); + + Portal portal = CreateNewPortal(); + + /* don't display the portal in pg_cursors, it is for internal use only */ + portal->visible = false; + + PortalDefineQuery(portal, NULL, queryString, "SELECT", list_make1(queryPlan), NULL); + int eflags = 0; + PortalStart(portal, NULL, eflags, GetActiveSnapshot()); + + return portal; +} + + +/* + * QueryTupleShardSearchInfo returns a DistTableCacheEntry which has enough + * information so that FindShardInterval() can find the shard corresponding + * to a tuple. + */ +static DistTableCacheEntry * +QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, + char partitionMethod, Var *partitionColumn) +{ + Datum *minValues = 0; + Datum *maxValues = 0; + bool *minValueNulls = 0; + bool *maxValueNulls = 0; + int minValuesCount = 0; + int maxValuesCount = 0; + Oid intervalTypeId = InvalidOid; + int32 intervalTypeMod = 0; + deconstruct_array(minValuesArray, TEXTOID, -1, false, 'i', &minValues, + &minValueNulls, &minValuesCount); + deconstruct_array(maxValuesArray, TEXTOID, -1, false, 'i', &maxValues, + &maxValueNulls, &maxValuesCount); + int partitionCount = minValuesCount; + Assert(maxValuesCount == partitionCount); + + GetIntervalTypeInfo(partitionMethod, partitionColumn, + &intervalTypeId, &intervalTypeMod); + FmgrInfo *shardColumnCompare = GetFunctionInfo(partitionColumn->vartype, + BTREE_AM_OID, BTORDER_PROC); + FmgrInfo *shardIntervalCompare = GetFunctionInfo(intervalTypeId, + BTREE_AM_OID, BTORDER_PROC); + FmgrInfo *hashFunction = NULL; + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + TypeCacheEntry *typeEntry = lookup_type_cache(partitionColumn->vartype, + TYPECACHE_HASH_PROC_FINFO); + + hashFunction = palloc0(sizeof(FmgrInfo)); + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CurrentMemoryContext); + } + + ShardInterval **shardIntervalArray = palloc0(partitionCount * + sizeof(ShardInterval *)); + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + Datum datumArray[Natts_pg_dist_shard] = { + [Anum_pg_dist_shard_logicalrelid - 1] = InvalidOid, + [Anum_pg_dist_shard_shardid - 1] = partitionIndex, + [Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL, + [Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex], + [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex] + }; + bool nullsArray[Natts_pg_dist_shard] = { + [Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex], + [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex] + }; + + shardIntervalArray[partitionIndex] = + DeformedDistShardTupleToShardInterval(datumArray, nullsArray, + intervalTypeId, intervalTypeMod); + shardIntervalArray[partitionIndex]->shardIndex = partitionIndex; + } + + DistTableCacheEntry *result = palloc0(sizeof(DistTableCacheEntry)); + result->partitionMethod = partitionMethod; + result->partitionColumn = partitionColumn; + result->shardIntervalCompareFunction = shardIntervalCompare; + result->shardColumnCompareFunction = shardColumnCompare; + result->hashFunction = hashFunction; + result->sortedShardIntervalArray = + SortShardIntervalArray(shardIntervalArray, partitionCount, + partitionColumn->varcollid, shardIntervalCompare); + result->hasUninitializedShardInterval = + HasUninitializedShardInterval(result->sortedShardIntervalArray, partitionCount); + result->hasOverlappingShardInterval = + result->hasUninitializedShardInterval || + HasOverlappingShardInterval(result->sortedShardIntervalArray, partitionCount, + partitionColumn->varcollid, shardIntervalCompare); + ErrorIfInconsistentShardIntervals(result); + + result->shardIntervalArrayLength = partitionCount; + + return result; +} + + +/* + * CreatePartitionedResultDestReceiver sets up a partitioned dest receiver. + */ +static PartitionedResultDestReceiver * +CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnIndex, + int partitionCount, TupleDesc tupleDescriptor, + bool binaryCopy, DistTableCacheEntry *shardSearchInfo, + MemoryContext perTupleContext) +{ + PartitionedResultDestReceiver *resultDest = + palloc0(sizeof(PartitionedResultDestReceiver)); + + /* set up the DestReceiver function pointers */ + resultDest->pub.receiveSlot = PartitionedResultDestReceiverReceive; + resultDest->pub.rStartup = PartitionedResultDestReceiverStartup; + resultDest->pub.rShutdown = PartitionedResultDestReceiverShutdown; + resultDest->pub.rDestroy = PartitionedResultDestReceiverDestroy; + resultDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + resultDest->resultIdPrefix = resultIdPrefix; + resultDest->perTupleContext = perTupleContext; + resultDest->partitionColumnIndex = partitionColumnIndex; + resultDest->partitionCount = partitionCount; + resultDest->shardSearchInfo = shardSearchInfo; + resultDest->tupleDescriptor = tupleDescriptor; + resultDest->binaryCopy = binaryCopy; + resultDest->partitionDestReceivers = + (DestReceiver **) palloc0(partitionCount * sizeof(DestReceiver *)); + + return resultDest; +} + + +/* + * PartitionedResultDestReceiverStartup implements the rStartup interface of + * PartitionedResultDestReceiver. + */ +static void +PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation, + TupleDesc inputTupleDescriptor) +{ + /* + * We don't expect this to be called multiple times, but if it happens, + * we will just overwrite previous files. + */ + PartitionedResultDestReceiver *partitionedDest = + (PartitionedResultDestReceiver *) copyDest; + int partitionCount = partitionedDest->partitionCount; + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + DestReceiver *partitionDest = + partitionedDest->partitionDestReceivers[partitionIndex]; + if (partitionDest != NULL) + { + partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor); + } + } +} + + +/* + * PartitionedResultDestReceiverReceive implements the receiveSlot interface of + * PartitionedResultDestReceiver. + */ +static bool +PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest) +{ + PartitionedResultDestReceiver *partitionedDest = + (PartitionedResultDestReceiver *) copyDest; + + slot_getallattrs(slot); + + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + + if (columnNulls[partitionedDest->partitionColumnIndex]) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column value cannot be NULL"))); + } + + Datum partitionColumnValue = columnValues[partitionedDest->partitionColumnIndex]; + ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, + partitionedDest->shardSearchInfo); + if (shardInterval == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find shard for partition column " + "value"))); + } + + int partitionIndex = shardInterval->shardIndex; + DestReceiver *partitionDest = partitionedDest->partitionDestReceivers[partitionIndex]; + if (partitionDest == NULL) + { + StringInfo resultId = makeStringInfo(); + appendStringInfo(resultId, "%s_%d", partitionedDest->resultIdPrefix, + partitionIndex); + char *filePath = QueryResultFileName(resultId->data); + + partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext, + partitionedDest->binaryCopy); + partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest; + partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor); + } + + partitionDest->receiveSlot(slot, partitionDest); + + return true; +} + + +/* + * PartitionedResultDestReceiverShutdown implements the rShutdown interface of + * PartitionedResultDestReceiver. + */ +static void +PartitionedResultDestReceiverShutdown(DestReceiver *copyDest) +{ + PartitionedResultDestReceiver *partitionedDest = + (PartitionedResultDestReceiver *) copyDest; + int partitionCount = partitionedDest->partitionCount; + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + DestReceiver *partitionDest = + partitionedDest->partitionDestReceivers[partitionIndex]; + if (partitionDest != NULL) + { + partitionDest->rShutdown(partitionDest); + } + } +} + + +/* + * PartitionedResultDestReceiverDestroy implements the rDestroy interface of + * PartitionedResultDestReceiver. + */ +static void +PartitionedResultDestReceiverDestroy(DestReceiver *copyDest) +{ + PartitionedResultDestReceiver *partitionedDest = + (PartitionedResultDestReceiver *) copyDest; + int partitionCount = partitionedDest->partitionCount; + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + DestReceiver *partitionDest = + partitionedDest->partitionDestReceivers[partitionIndex]; + if (partitionDest != NULL) + { + /* this call should also free partitionDest, so no need to free it after */ + partitionDest->rDestroy(partitionDest); + } + } + + pfree(partitionedDest->partitionDestReceivers); + pfree(partitionedDest); +} diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index df1d0625e..68a6a7cba 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -194,15 +194,9 @@ static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId); static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); static void PrepareWorkerNodeCache(void); -static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, - int shardCount); static bool CheckInstalledVersion(int elevel); static char * AvailableExtensionVersion(void); static char * InstalledExtensionVersion(void); -static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, - int shardIntervalArrayLength, - Oid shardIntervalCollation, - FmgrInfo *shardIntervalSortCompareFunction); static bool CitusHasBeenLoadedInternal(void); static void InitializeCaches(void); static void InitializeDistCache(void); @@ -1245,22 +1239,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) cacheEntry->hasOverlappingShardInterval = true; } - /* - * If table is hash-partitioned and has shards, there never should be - * any uninitalized shards. Historically we've not prevented that for - * range partitioned tables, but it might be a good idea to start - * doing so. - */ - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && - cacheEntry->hasUninitializedShardInterval) - { - ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards"))); - } - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && - cacheEntry->hasOverlappingShardInterval) - { - ereport(ERROR, (errmsg("hash partitioned table has overlapping shards"))); - } + ErrorIfInconsistentShardIntervals(cacheEntry); } /* @@ -1330,6 +1309,31 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) } +/* + * ErrorIfInconsistentShardIntervals checks if shard intervals are consistent with + * our expectations. + */ +void +ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry) +{ + /* + * If table is hash-partitioned and has shards, there never should be any + * uninitalized shards. Historically we've not prevented that for range + * partitioned tables, but it might be a good idea to start doing so. + */ + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errmsg("hash partitioned table has uninitialized shards"))); + } + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->hasOverlappingShardInterval) + { + ereport(ERROR, (errmsg("hash partitioned table has overlapping shards"))); + } +} + + /* * HasUniformHashDistribution determines whether the given list of sorted shards * has a uniform hash distribution, as produced by master_create_worker_shards for @@ -1376,7 +1380,7 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray, * ensure that input shard interval array is sorted on shardminvalue and uninitialized * shard intervals are at the end of the array. */ -static bool +bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount) { bool hasUninitializedShardInterval = false; @@ -1406,7 +1410,7 @@ HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shar * HasOverlappingShardInterval determines whether the given list of sorted * shards has overlapping ranges. */ -static bool +bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, int shardIntervalArrayLength, Oid shardIntervalCollation, @@ -3682,8 +3686,9 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Var *partitionColumn = (Var *) partitionNode; Assert(IsA(partitionNode, Var)); - *intervalTypeId = partitionColumn->vartype; - *intervalTypeMod = partitionColumn->vartypmod; + GetIntervalTypeInfo(partitionMethod, partitionColumn, + intervalTypeId, intervalTypeMod); + *columnTypeId = partitionColumn->vartype; *columnTypeMod = partitionColumn->vartypmod; break; @@ -3695,7 +3700,9 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, Var *partitionColumn = (Var *) partitionNode; Assert(IsA(partitionNode, Var)); - *intervalTypeId = INT4OID; + GetIntervalTypeInfo(partitionMethod, partitionColumn, + intervalTypeId, intervalTypeMod); + *columnTypeId = partitionColumn->vartype; *columnTypeMod = partitionColumn->vartypmod; break; @@ -3716,6 +3723,42 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, } +/* + * GetIntervalTypeInfo gets type id and type mod of the min/max values + * of shard intervals for a distributed table with given partition method + * and partition column. + */ +void +GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, + Oid *intervalTypeId, int32 *intervalTypeMod) +{ + *intervalTypeId = InvalidOid; + *intervalTypeMod = -1; + + switch (partitionMethod) + { + case DISTRIBUTE_BY_APPEND: + case DISTRIBUTE_BY_RANGE: + { + *intervalTypeId = partitionColumn->vartype; + *intervalTypeMod = partitionColumn->vartypmod; + break; + } + + case DISTRIBUTE_BY_HASH: + { + *intervalTypeId = INT4OID; + break; + } + + default: + { + break; + } + } +} + + /* * TupleToShardInterval transforms the specified dist_shard tuple into a new * ShardInterval using the provided descriptor and partition type information. @@ -3725,10 +3768,33 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, int32 intervalTypeMod) { - Oid inputFunctionId = InvalidOid; - Oid typeIoParam = InvalidOid; Datum datumArray[Natts_pg_dist_shard]; bool isNullArray[Natts_pg_dist_shard]; + + /* + * We use heap_deform_tuple() instead of heap_getattr() to expand tuple + * to contain missing values when ALTER TABLE ADD COLUMN happens. + */ + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + + ShardInterval *shardInterval = + DeformedDistShardTupleToShardInterval(datumArray, isNullArray, + intervalTypeId, intervalTypeMod); + + return shardInterval; +} + + +/* + * DeformedDistShardTupleToShardInterval transforms the specified deformed + * pg_dist_shard tuple into a new ShardInterval. + */ +ShardInterval * +DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, + Oid intervalTypeId, int32 intervalTypeMod) +{ + Oid inputFunctionId = InvalidOid; + Oid typeIoParam = InvalidOid; Datum minValue = 0; Datum maxValue = 0; bool minValueExists = false; @@ -3738,14 +3804,8 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid char intervalAlign = '0'; char intervalDelim = '0'; - /* - * We use heap_deform_tuple() instead of heap_getattr() to expand tuple - * to contain missing values when ALTER TABLE ADD COLUMN happens. - */ - heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); - - Oid relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - - 1]); + Oid relationId = + DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]); int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]); char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; diff --git a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql index 79a04e608..57870e092 100644 --- a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql +++ b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql @@ -1,5 +1,6 @@ #include "udfs/read_intermediate_results/9.2-1.sql" #include "udfs/fetch_intermediate_results/9.2-1.sql" +#include "udfs/worker_partition_query_result/9.2-1.sql" ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid; UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation diff --git a/src/backend/distributed/sql/udfs/worker_partition_query_result/9.2-1.sql b/src/backend/distributed/sql/udfs/worker_partition_query_result/9.2-1.sql new file mode 100644 index 000000000..bda8384fb --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_partition_query_result/9.2-1.sql @@ -0,0 +1,16 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result( + result_prefix text, + query text, + partition_column_index int, + partition_method citus.distribution_type, + partition_min_values text[], + partition_max_values text[], + binaryCopy boolean, + OUT partition_index int, + OUT rows_written bigint, + OUT bytes_written bigint) +RETURNS SETOF record +LANGUAGE C STRICT VOLATILE +AS 'MODULE_PATHNAME', $$worker_partition_query_result$$; +COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean) +IS 'execute a query and partitions its results in set of local result files'; diff --git a/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql b/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql new file mode 100644 index 000000000..bda8384fb --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_partition_query_result/latest.sql @@ -0,0 +1,16 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_partition_query_result( + result_prefix text, + query text, + partition_column_index int, + partition_method citus.distribution_type, + partition_min_values text[], + partition_max_values text[], + binaryCopy boolean, + OUT partition_index int, + OUT rows_written bigint, + OUT bytes_written bigint) +RETURNS SETOF record +LANGUAGE C STRICT VOLATILE +AS 'MODULE_PATHNAME', $$worker_partition_query_result$$; +COMMENT ON FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean) +IS 'execute a query and partitions its results in set of local result files'; diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index 9ccd5a9a5..46ef8f147 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -24,6 +24,8 @@ /* necessary to get S_IRUSR, S_IWUSR definitions on illumos */ #include +#define COPY_BUFFER_SIZE (4 * 1024 * 1024) + /* TaskFileDestReceiver can be used to stream results into a file */ typedef struct TaskFileDestReceiver { @@ -33,8 +35,8 @@ typedef struct TaskFileDestReceiver /* descriptor of the tuples that are sent to the worker */ TupleDesc tupleDescriptor; - /* EState for per-tuple memory allocation */ - EState *executorState; + /* context for per-tuple memory allocation */ + MemoryContext tupleContext; /* MemoryContext for DestReceiver session */ MemoryContext memoryContext; @@ -48,13 +50,12 @@ typedef struct TaskFileDestReceiver CopyOutState copyOutState; FmgrInfo *columnOutputFunctions; - /* number of tuples sent */ + /* statistics */ uint64 tuplesSent; + uint64 bytesSent; } TaskFileDestReceiver; -static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState, - bool binaryCopyFormat); static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); @@ -102,9 +103,10 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) ParamListInfo paramListInfo = NULL; EState *estate = CreateExecutorState(); + MemoryContext tupleContext = GetPerTupleMemoryContext(estate); TaskFileDestReceiver *taskFileDest = - (TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate, - binaryCopyFormat); + (TaskFileDestReceiver *) CreateFileDestReceiver(taskFilename, tupleContext, + binaryCopyFormat); ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); @@ -118,11 +120,11 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) /* - * CreateTaskFileDestReceiver creates a DestReceiver for writing query results - * to a task file. + * CreateFileDestReceiver creates a DestReceiver for writing query results + * to a file. */ -static DestReceiver * -CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat) +DestReceiver * +CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCopyFormat) { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) palloc0( sizeof(TaskFileDestReceiver)); @@ -135,7 +137,7 @@ CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCop taskFileDest->pub.mydest = DestCopyOut; /* set up output parameters */ - taskFileDest->executorState = executorState; + taskFileDest->tupleContext = tupleContext; taskFileDest->memoryContext = CurrentMemoryContext; taskFileDest->filePath = pstrdup(filePath); taskFileDest->binaryCopyFormat = binaryCopyFormat; @@ -173,7 +175,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, copyOutState->null_print_client = (char *) nullPrintCharacter; copyOutState->binary = taskFileDest->binaryCopyFormat; copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState); + copyOutState->rowcontext = taskFileDest->tupleContext; taskFileDest->copyOutState = copyOutState; taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, @@ -187,10 +189,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, if (copyOutState->binary) { /* write headers when using binary encoding */ - resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryHeaders(copyOutState); - - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); } MemoryContextSwitchTo(oldContext); @@ -214,8 +213,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) StringInfo copyData = copyOutState->fe_msgbuf; - EState *executorState = taskFileDest->executorState; - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext executorTupleContext = taskFileDest->tupleContext; MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); slot_getallattrs(slot); @@ -223,19 +221,21 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - resetStringInfo(copyData); - /* construct row in COPY format */ AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, copyOutState, columnOutputFunctions, NULL); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); + if (copyData->len > COPY_BUFFER_SIZE) + { + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); + resetStringInfo(copyData); + } MemoryContextSwitchTo(oldContext); taskFileDest->tuplesSent++; - ResetPerTupleExprContext(executorState); + MemoryContextReset(executorTupleContext); return true; } @@ -254,6 +254,8 @@ WriteToLocalFile(StringInfo copyData, TaskFileDestReceiver *taskFileDest) ereport(ERROR, (errcode_for_file_access(), errmsg("could not append to file: %m"))); } + + taskFileDest->bytesSent += bytesWritten; } @@ -268,12 +270,18 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver) TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; CopyOutState copyOutState = taskFileDest->copyOutState; + if (copyOutState->fe_msgbuf->len > 0) + { + WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); + resetStringInfo(copyOutState->fe_msgbuf); + } + if (copyOutState->binary) { /* write footers when using binary encoding */ - resetStringInfo(copyOutState->fe_msgbuf); AppendCopyBinaryFooters(copyOutState); WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); + resetStringInfo(copyOutState->fe_msgbuf); } FileClose(taskFileDest->fileCompat.fd); @@ -302,3 +310,15 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver) pfree(taskFileDest->filePath); pfree(taskFileDest); } + + +/* + * FileDestReceiverStats returns statistics for the given file dest receiver. + */ +void +FileDestReceiverStats(DestReceiver *dest, uint64 *rowsSent, uint64 *bytesSent) +{ + TaskFileDestReceiver *fileDestReceiver = (TaskFileDestReceiver *) dest; + *rowsSent = fileDestReceiver->tuplesSent; + *bytesSent = fileDestReceiver->bytesSent; +} diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 136a8c541..1e5985d60 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -29,6 +29,8 @@ extern void SendQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId); extern void RemoveIntermediateResultsDirectory(void); extern int64 IntermediateResultSize(char *resultId); +extern char * QueryResultFileName(const char *resultId); +extern char * CreateIntermediateResultsDirectory(void); #endif /* INTERMEDIATE_RESULTS_H */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 789ccd142..2cdbce21e 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -155,6 +155,11 @@ extern Datum StringToDatum(char *inputString, Oid dataType); extern char * DatumToString(Datum datum, Oid dataType); extern int CompareShardPlacementsByWorker(const void *leftElement, const void *rightElement); - +extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray, + bool *isNullArray, + Oid intervalTypeId, + int32 intervalTypeMod); +extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, + Oid *intervalTypeId, int32 *intervalTypeMod); #endif /* MASTER_METADATA_UTILITY_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 0af8434e6..7f2bc2077 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -140,13 +140,20 @@ extern void InvalidateMetadataSystemCache(void); extern Datum DistNodeMetadata(void); extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength); +extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, + int shardCount); +extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength, + Oid shardIntervalCollation, + FmgrInfo *shardIntervalSortCompareFunction); extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); extern bool CheckAvailableVersion(int elevel); -bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); - +extern bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); +extern void ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry); extern void EnsureModificationsCanRun(void); +extern char LookupDistributionMethod(Oid distributionMethodOid); /* access WorkerNodeHash */ extern HTAB * GetWorkerNodeHash(void); diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 8ac5c9999..cbb15dd37 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -58,6 +58,7 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; #define SHARD_STORAGE_FOREIGN 'f' #define SHARD_STORAGE_TABLE 't' #define SHARD_STORAGE_COLUMNAR 'c' +#define SHARD_STORAGE_VIRTUAL 'v' #endif /* PG_DIST_SHARD_H */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 420f3ac3b..da29535b7 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -136,6 +136,12 @@ extern StringInfo UserTaskFilename(StringInfo directoryName, uint32 taskId); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); +extern DestReceiver * CreateFileDestReceiver(char *filePath, + MemoryContext tupleContext, + bool binaryCopyFormat); +extern void FileDestReceiverStats(DestReceiver *dest, + uint64 *rowsSent, + uint64 *bytesSent); /* Function declaration for parsing tree node */ extern Node * ParseTreeNode(const char *ddlCommand); diff --git a/src/test/regress/expected/partitioned_intermediate_results.out b/src/test/regress/expected/partitioned_intermediate_results.out new file mode 100644 index 000000000..560cba703 --- /dev/null +++ b/src/test/regress/expected/partitioned_intermediate_results.out @@ -0,0 +1,513 @@ +-- Test functions for partitioning intermediate results +CREATE SCHEMA partitioned_intermediate_results; +SET search_path TO 'partitioned_intermediate_results'; +-- hash partitioned intermediate results +BEGIN; +SELECT * FROM worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i', 0, 'hash', + '{-2147483648,-1073741824,0,1073741824}'::text[], + '{-1073741825,-1,1073741823,2147483647}'::text[], false); + partition_index | rows_written | bytes_written +-----------------+--------------+--------------- + 0 | 4 | 21 + 1 | 3 | 14 + 2 | 1 | 5 + 3 | 2 | 9 +(4 rows) + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_0', 'text') AS res (x int, x2 int) +ORDER BY x; + hashint4 | x | x2 +-------------+----+----- + -1905060026 | 1 | 1 + -1330264708 | 5 | 25 + -2047600124 | 8 | 64 + -1547814713 | 10 | 100 +(4 rows) + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_1', 'text') AS res (x int, x2 int) +ORDER BY x; + hashint4 | x | x2 +-------------+---+---- + -28094569 | 3 | 9 + -1011077333 | 4 | 16 + -978793473 | 7 | 49 +(3 rows) + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_2', 'text') AS res (x int, x2 int) +ORDER BY x; + hashint4 | x | x2 +-----------+---+---- + 566031088 | 6 | 36 +(1 row) + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_3', 'text') AS res (x int, x2 int) +ORDER BY x; + hashint4 | x | x2 +------------+---+---- + 1134484726 | 2 | 4 + 1672378334 | 9 | 81 +(2 rows) + +END; +-- range partitioned intermediate results +BEGIN; +SELECT * FROM worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, /* partition by x^2 */ + 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true /* binary format */); + partition_index | rows_written | bytes_written +-----------------+--------------+--------------- + 0 | 4 | 93 + 1 | 2 | 57 + 2 | 1 | 39 + 3 | 3 | 75 +(4 rows) + +SELECT x, x2 FROM +read_intermediate_result('squares_range_0', 'binary') AS res (x int, x2 int) +ORDER BY x; + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 +(4 rows) + +SELECT x, x2 FROM +read_intermediate_result('squares_range_1', 'binary') AS res (x int, x2 int) +ORDER BY x; + x | x2 +---+---- + 5 | 25 + 6 | 36 +(2 rows) + +SELECT x, x2 FROM +read_intermediate_result('squares_range_2', 'binary') AS res (x int, x2 int) +ORDER BY x; + x | x2 +---+---- + 7 | 49 +(1 row) + +SELECT x, x2 FROM +read_intermediate_result('squares_range_3', 'binary') AS res (x int, x2 int) +ORDER BY x; + x | x2 +----+----- + 8 | 64 + 9 | 81 + 10 | 100 +(3 rows) + +END; +-- 1M rows, just in case. text format. +BEGIN; +SELECT * FROM worker_partition_query_result('doubles_hash', + 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'hash', + '{-2147483648,-1073741824,0,1073741824}'::text[], + '{-1073741825,-1,1073741823,2147483647}'::text[], false); + partition_index | rows_written | bytes_written +-----------------+--------------+--------------- + 0 | 250199 | 3586179 + 1 | 249872 | 3581280 + 2 | 250278 | 3587487 + 3 | 249651 | 3578401 +(4 rows) + +SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_hash_0', + 'doubles_hash_1', + 'doubles_hash_2', + 'doubles_hash_3'], 'text') AS res (x int, x2 int); + count +--------- + 1000000 +(1 row) + +END; +-- 1M rows, just in case. binary format. +BEGIN; +SELECT * FROM worker_partition_query_result('doubles_range', + 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'range', + '{0,250001,500001,750001}'::text[], + '{250000,500000,750000,1000000}'::text[], true); + partition_index | rows_written | bytes_written +-----------------+--------------+--------------- + 0 | 250000 | 4500021 + 1 | 250000 | 4500021 + 2 | 250000 | 4500021 + 3 | 250000 | 4500021 +(4 rows) + +SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_range_0', + 'doubles_range_1', + 'doubles_range_2', + 'doubles_range_3'], 'binary') AS res (x int, x2 int); + count +--------- + 1000000 +(1 row) + +END; +-- +-- Some error cases +-- +-- not allowed outside transaction block +SELECT * FROM worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', '{0}'::text[], '{20}'::text[], true); +ERROR: worker_partition_query_result can only be used in a transaction block +BEGIN; +SAVEPOINT s1; +-- syntax error in query +SELECT worker_partition_query_result('squares_range', + 'SELECxT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: syntax error at or near "SELECxT" +LINE 1: SELECT worker_partition_query_result('squares_range', + ^ +ROLLBACK TO SAVEPOINT s1; +-- invalid result prefix +SELECT worker_partition_query_result('squares_range/a/', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: result key "squares_range/a/" contains invalid character +HINT: Result keys may only contain letters, numbers, underscores and hyphens. +ROLLBACK TO SAVEPOINT s1; +-- empty min/max values +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', ARRAY[]::text[], ARRAY[]::text[], true); +ERROR: number of partitions cannot be 0 +ROLLBACK TO SAVEPOINT s1; +-- append partitioning +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'append', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: only hash and range partitiong schemes are supported +ROLLBACK TO SAVEPOINT s1; +-- query with no results +CREATE TABLE t(a int); +SELECT worker_partition_query_result('squares_range', + 'INSERT INTO t VALUES (1), (2)', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: query must generate a set of rows +ROLLBACK TO SAVEPOINT s1; +-- negative partition index +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + -1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: partition column index must be between 0 and 1 +ROLLBACK TO SAVEPOINT s1; +-- too large partition index +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 2, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: partition column index must be between 0 and 1 +ROLLBACK TO SAVEPOINT s1; +-- min/max values of different lengths +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61,101}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: min values and max values must have the same number of elements +ROLLBACK TO SAVEPOINT s1; +-- null values in min/max values of hash partitioned results +SELECT worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'hash', + '{NULL,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: hash partitioned table has uninitialized shards +ROLLBACK TO SAVEPOINT s1; +-- multiple queries +SELECT worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i; SELECT 4, 16;', + 1, 'hash', + '{NULL,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ERROR: cannot execute multiple utility events +ROLLBACK TO SAVEPOINT s1; +ROLLBACK; +-- +-- Procedure for conveniently testing worker_partition_query_result(). It uses +-- worker_partition_query_results to partition result of query using the same +-- scheme as the distributed table rel, and then compares if it did the partitioning +-- the same way as shards of rel. +-- +CREATE OR REPLACE PROCEDURE test_partition_query_results(rel regclass, query text, + binaryCopy boolean DEFAULT true) +AS $$ +DECLARE + partition_min_values text[]; + partition_max_values text[]; + partition_column_index int; + partition_method citus.distribution_type; + partitioned_results_row_counts text[]; + distributed_table_row_counts text[]; + tuple_def text; + partition_result_names text[]; + non_empty_partitions int[]; + rows_different int; +BEGIN + -- get tuple definition + SELECT string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ' ORDER BY a.attnum) + INTO tuple_def + FROM pg_catalog.pg_attribute a + WHERE a.attrelid = rel::oid AND a.attnum > 0 AND NOT a.attisdropped; + + -- get min/max value arrays + SELECT array_agg(shardminvalue ORDER BY shardid), + array_agg(shardmaxvalue ORDER BY shardid) + INTO partition_min_values, partition_max_values + FROM pg_dist_shard + WHERE logicalrelid=rel; + + -- get partition column index and partition method + SELECT (regexp_matches(partkey, ':varattno ([0-9]+)'))[1]::int - 1, + (CASE WHEN partmethod='h' THEN 'hash' ELSE 'range' END) + INTO partition_column_index, partition_method + FROM pg_dist_partition + WHERE logicalrelid=rel; + + -- insert into the distributed table + EXECUTE 'INSERT INTO ' || rel::text || ' ' || query; + + -- repartition the query locally + SELECT array_agg(rows_written::text ORDER BY partition_index), + array_agg(partition_index) FILTER (WHERE rows_written > 0) + INTO partitioned_results_row_counts, + non_empty_partitions + FROM worker_partition_query_result('test_prefix', query, partition_column_index, + partition_method, partition_min_values, + partition_max_values, binaryCopy); + + SELECT array_agg('test_prefix_' || i::text) + INTO partition_result_names + FROM unnest(non_empty_partitions) i; + + EXECUTE 'SELECT count(*) FROM ((' || query || ') EXCEPT (SELECT * FROM read_intermediate_results($1,$2) AS res (' || tuple_def || '))) t' + INTO rows_different + USING partition_result_names, (CASE WHEN binaryCopy THEN 'binary' ELSE 'text' END)::pg_catalog.citus_copy_format; + + -- commit so results are available in run_command_on_shards + COMMIT; + + -- rows per shard of the distributed table + SELECT array_agg(result order by shardid) INTO distributed_table_row_counts + FROM run_command_on_shards(rel, 'SELECT count(*) FROM %s'); + + IF partitioned_results_row_counts = distributed_table_row_counts THEN + RAISE NOTICE 'Rows per partition match ...'; + ELSE + RAISE 'FAILED: rows per partition do not match, expecting % got %', distributed_table_row_counts, partitioned_results_row_counts; + END IF; + + IF rows_different = 0 THEN + RAISE NOTICE 'Row values match ...'; + ELSE + RAISE 'FAILED: Could not find % of expected rows in partitions', rows_different; + END IF; + + RAISE NOTICE 'PASSED.'; +END; +$$ LANGUAGE plpgsql; +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql; +\set VERBOSITY terse +-- hash partitioning, 32 shards +SET citus.shard_count TO 32; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- hash partitioning, 1 shard +SET citus.shard_count TO 1; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- hash partitioning, 17 shards (so hash partitions aren't uniform) +SET citus.shard_count TO 17; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- hash partitioning, date partition column +SET citus.shard_count TO 8; +CREATE TABLE t(a DATE, b int); +SELECT create_distributed_table('t', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CALL test_partition_query_results('t', 'SELECT (''1985-05-18''::date + (x::text || '' days'')::interval)::date, x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- hash partitioning, int4 range partition column +SET citus.shard_count TO 8; +CREATE TABLE t(a int4range, b int); +SELECT create_distributed_table('t', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CALL test_partition_query_results('t', 'SELECT int4range(x,2*x+10), x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- range partitioning, int partition column +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +CALL create_range_partitioned_shards('t', '{0,25,50,76}', + '{24,49,75,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- not covering ranges, should ERROR +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +CALL create_range_partitioned_shards('t', '{0,25,50,100}', + '{24,49,75,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +ERROR: could not find shard for partition column value +DROP TABLE t; +-- overlapping ranges, we allow this in range partitioned distributed tables, should be fine +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +CALL create_range_partitioned_shards('t', '{0,25,50,76}', + '{50,49,90,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +-- range partitioning, composite partition column +CREATE TYPE composite_key_type AS (f1 int, f2 text); +SET citus.shard_count TO 8; +CREATE TABLE t(key composite_key_type, value int); +SELECT create_distributed_table('t', 'key', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', + '{"(24,z)","(49,z)","(74,z)","(100,z)"}'); +CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +DROP TYPE composite_key_type; +-- unsorted ranges +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +CALL create_range_partitioned_shards('t', '{50,25,76,0}', + '{75,49,200,24}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +NOTICE: Rows per partition match ... +NOTICE: Row values match ... +NOTICE: PASSED. +DROP TABLE t; +SET client_min_messages TO WARNING; +DROP SCHEMA partitioned_intermediate_results CASCADE; +\set VERBOSITY default +SET client_min_messages TO DEFAULT; +SET citus.shard_count TO DEFAULT; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4542b18da..e3c251c71 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -70,7 +70,7 @@ test: subquery_prepared_statements pg12 # Miscellaneous tests to check our query planning behavior # ---------- test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results limit_intermediate_size -test: multi_explain hyperscale_tutorial +test: multi_explain hyperscale_tutorial partitioned_intermediate_results test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: sql_procedure multi_function_in_join row_types materialized_view diff --git a/src/test/regress/sql/partitioned_intermediate_results.sql b/src/test/regress/sql/partitioned_intermediate_results.sql new file mode 100644 index 000000000..8d4d93f13 --- /dev/null +++ b/src/test/regress/sql/partitioned_intermediate_results.sql @@ -0,0 +1,368 @@ +-- Test functions for partitioning intermediate results +CREATE SCHEMA partitioned_intermediate_results; +SET search_path TO 'partitioned_intermediate_results'; + + +-- hash partitioned intermediate results +BEGIN; +SELECT * FROM worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i', 0, 'hash', + '{-2147483648,-1073741824,0,1073741824}'::text[], + '{-1073741825,-1,1073741823,2147483647}'::text[], false); +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_0', 'text') AS res (x int, x2 int) +ORDER BY x; + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_1', 'text') AS res (x int, x2 int) +ORDER BY x; + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_2', 'text') AS res (x int, x2 int) +ORDER BY x; + +SELECT hashint4(x), x, x2 FROM +read_intermediate_result('squares_hash_3', 'text') AS res (x int, x2 int) +ORDER BY x; + +END; + +-- range partitioned intermediate results +BEGIN; +SELECT * FROM worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, /* partition by x^2 */ + 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true /* binary format */); +SELECT x, x2 FROM +read_intermediate_result('squares_range_0', 'binary') AS res (x int, x2 int) +ORDER BY x; + +SELECT x, x2 FROM +read_intermediate_result('squares_range_1', 'binary') AS res (x int, x2 int) +ORDER BY x; + +SELECT x, x2 FROM +read_intermediate_result('squares_range_2', 'binary') AS res (x int, x2 int) +ORDER BY x; + +SELECT x, x2 FROM +read_intermediate_result('squares_range_3', 'binary') AS res (x int, x2 int) +ORDER BY x; + +END; + +-- 1M rows, just in case. text format. +BEGIN; +SELECT * FROM worker_partition_query_result('doubles_hash', + 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'hash', + '{-2147483648,-1073741824,0,1073741824}'::text[], + '{-1073741825,-1,1073741823,2147483647}'::text[], false); +SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_hash_0', + 'doubles_hash_1', + 'doubles_hash_2', + 'doubles_hash_3'], 'text') AS res (x int, x2 int); +END; + +-- 1M rows, just in case. binary format. +BEGIN; +SELECT * FROM worker_partition_query_result('doubles_range', + 'SELECT i, i * 2 FROM generate_series(1, 1000000) i', 0, 'range', + '{0,250001,500001,750001}'::text[], + '{250000,500000,750000,1000000}'::text[], true); +SELECT count(*) FROM read_intermediate_results(ARRAY['doubles_range_0', + 'doubles_range_1', + 'doubles_range_2', + 'doubles_range_3'], 'binary') AS res (x int, x2 int); +END; + +-- +-- Some error cases +-- + +-- not allowed outside transaction block +SELECT * FROM worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', '{0}'::text[], '{20}'::text[], true); + +BEGIN; +SAVEPOINT s1; +-- syntax error in query +SELECT worker_partition_query_result('squares_range', + 'SELECxT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- invalid result prefix +SELECT worker_partition_query_result('squares_range/a/', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- empty min/max values +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', ARRAY[]::text[], ARRAY[]::text[], true); +ROLLBACK TO SAVEPOINT s1; + +-- append partitioning +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'append', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- query with no results +CREATE TABLE t(a int); +SELECT worker_partition_query_result('squares_range', + 'INSERT INTO t VALUES (1), (2)', + 1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- negative partition index +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + -1, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- too large partition index +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 2, 'range', + '{0,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- min/max values of different lengths +SELECT worker_partition_query_result('squares_range', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'range', + '{0,21,41,61,101}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- null values in min/max values of hash partitioned results +SELECT worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i', + 1, 'hash', + '{NULL,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; + +-- multiple queries +SELECT worker_partition_query_result('squares_hash', + 'SELECT i, i * i FROM generate_series(1, 10) i; SELECT 4, 16;', + 1, 'hash', + '{NULL,21,41,61}'::text[], + '{20,40,60,100}'::text[], + true); +ROLLBACK TO SAVEPOINT s1; +ROLLBACK; + +-- +-- Procedure for conveniently testing worker_partition_query_result(). It uses +-- worker_partition_query_results to partition result of query using the same +-- scheme as the distributed table rel, and then compares if it did the partitioning +-- the same way as shards of rel. +-- +CREATE OR REPLACE PROCEDURE test_partition_query_results(rel regclass, query text, + binaryCopy boolean DEFAULT true) +AS $$ +DECLARE + partition_min_values text[]; + partition_max_values text[]; + partition_column_index int; + partition_method citus.distribution_type; + partitioned_results_row_counts text[]; + distributed_table_row_counts text[]; + tuple_def text; + partition_result_names text[]; + non_empty_partitions int[]; + rows_different int; +BEGIN + -- get tuple definition + SELECT string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ' ORDER BY a.attnum) + INTO tuple_def + FROM pg_catalog.pg_attribute a + WHERE a.attrelid = rel::oid AND a.attnum > 0 AND NOT a.attisdropped; + + -- get min/max value arrays + SELECT array_agg(shardminvalue ORDER BY shardid), + array_agg(shardmaxvalue ORDER BY shardid) + INTO partition_min_values, partition_max_values + FROM pg_dist_shard + WHERE logicalrelid=rel; + + -- get partition column index and partition method + SELECT (regexp_matches(partkey, ':varattno ([0-9]+)'))[1]::int - 1, + (CASE WHEN partmethod='h' THEN 'hash' ELSE 'range' END) + INTO partition_column_index, partition_method + FROM pg_dist_partition + WHERE logicalrelid=rel; + + -- insert into the distributed table + EXECUTE 'INSERT INTO ' || rel::text || ' ' || query; + + -- repartition the query locally + SELECT array_agg(rows_written::text ORDER BY partition_index), + array_agg(partition_index) FILTER (WHERE rows_written > 0) + INTO partitioned_results_row_counts, + non_empty_partitions + FROM worker_partition_query_result('test_prefix', query, partition_column_index, + partition_method, partition_min_values, + partition_max_values, binaryCopy); + + SELECT array_agg('test_prefix_' || i::text) + INTO partition_result_names + FROM unnest(non_empty_partitions) i; + + EXECUTE 'SELECT count(*) FROM ((' || query || ') EXCEPT (SELECT * FROM read_intermediate_results($1,$2) AS res (' || tuple_def || '))) t' + INTO rows_different + USING partition_result_names, (CASE WHEN binaryCopy THEN 'binary' ELSE 'text' END)::pg_catalog.citus_copy_format; + + -- commit so results are available in run_command_on_shards + COMMIT; + + -- rows per shard of the distributed table + SELECT array_agg(result order by shardid) INTO distributed_table_row_counts + FROM run_command_on_shards(rel, 'SELECT count(*) FROM %s'); + + IF partitioned_results_row_counts = distributed_table_row_counts THEN + RAISE NOTICE 'Rows per partition match ...'; + ELSE + RAISE 'FAILED: rows per partition do not match, expecting % got %', distributed_table_row_counts, partitioned_results_row_counts; + END IF; + + IF rows_different = 0 THEN + RAISE NOTICE 'Row values match ...'; + ELSE + RAISE 'FAILED: Could not find % of expected rows in partitions', rows_different; + END IF; + + RAISE NOTICE 'PASSED.'; +END; +$$ LANGUAGE plpgsql; + +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +\set VERBOSITY terse + +-- hash partitioning, 32 shards +SET citus.shard_count TO 32; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +DROP TABLE t; + +-- hash partitioning, 1 shard +SET citus.shard_count TO 1; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +DROP TABLE t; + +-- hash partitioning, 17 shards (so hash partitions aren't uniform) +SET citus.shard_count TO 17; +CREATE TABLE t(a int, b int); +SELECT create_distributed_table('t', 'a'); +CALL test_partition_query_results('t', 'SELECT x, x * x FROM generate_series(1, 100) x'); +DROP TABLE t; + +-- hash partitioning, date partition column +SET citus.shard_count TO 8; +CREATE TABLE t(a DATE, b int); +SELECT create_distributed_table('t', 'a'); +CALL test_partition_query_results('t', 'SELECT (''1985-05-18''::date + (x::text || '' days'')::interval)::date, x * x FROM generate_series(1, 100) x'); +DROP TABLE t; + +-- hash partitioning, int4 range partition column +SET citus.shard_count TO 8; +CREATE TABLE t(a int4range, b int); +SELECT create_distributed_table('t', 'a'); +CALL test_partition_query_results('t', 'SELECT int4range(x,2*x+10), x * x FROM generate_series(1, 100) x'); +DROP TABLE t; + +-- range partitioning, int partition column +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); +CALL create_range_partitioned_shards('t', '{0,25,50,76}', + '{24,49,75,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +DROP TABLE t; + +-- not covering ranges, should ERROR +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); +CALL create_range_partitioned_shards('t', '{0,25,50,100}', + '{24,49,75,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +DROP TABLE t; + +-- overlapping ranges, we allow this in range partitioned distributed tables, should be fine +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); +CALL create_range_partitioned_shards('t', '{0,25,50,76}', + '{50,49,90,200}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +DROP TABLE t; + +-- range partitioning, composite partition column +CREATE TYPE composite_key_type AS (f1 int, f2 text); +SET citus.shard_count TO 8; +CREATE TABLE t(key composite_key_type, value int); +SELECT create_distributed_table('t', 'key', 'range'); +CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', + '{"(24,z)","(49,z)","(74,z)","(100,z)"}'); +CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x'); +DROP TABLE t; +DROP TYPE composite_key_type; + +-- unsorted ranges +CREATE TABLE t(key int, value int); +SELECT create_distributed_table('t', 'key', 'range'); +CALL create_range_partitioned_shards('t', '{50,25,76,0}', + '{75,49,200,24}'); +CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); +DROP TABLE t; + + +SET client_min_messages TO WARNING; +DROP SCHEMA partitioned_intermediate_results CASCADE; + +\set VERBOSITY default +SET client_min_messages TO DEFAULT; +SET citus.shard_count TO DEFAULT;