Merge pull request #2075 from citusdata/hash_single_repartition

Implement hash-repartitioning for single repartition joins (e.g., enable single repartition joins among hash distributed tables)
pull/2141/head
Önder Kalacı 2018-05-02 22:07:31 +03:00 committed by GitHub
commit 7fd4383886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1094 additions and 115 deletions

View File

@ -15,7 +15,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.1-1 7.1-2 7.1-3 7.1-4 \
7.2-1 7.2-2 7.2-3 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2
7.4-1 7.4-2 7.4-3
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -197,6 +197,8 @@ $(EXTENSION)--7.4-1.sql: $(EXTENSION)--7.3-3.sql $(EXTENSION)--7.3-3--7.4-1.sql
cat $^ > $@
$(EXTENSION)--7.4-2.sql: $(EXTENSION)--7.4-1.sql $(EXTENSION)--7.4-1--7.4-2.sql
cat $^ > $@
$(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,13 @@
/* citus--7.4-2--7.4-3 */
SET search_path = 'pg_catalog';
-- note that we're not dropping the older version of the function
CREATE FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, anyarray)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_hash_partition_table$$;
COMMENT ON FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid,
anyarray)
IS 'hash partition query results';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.4-2'
default_version = '7.4-3'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -35,6 +35,7 @@
/* Config variables managed via guc.c */
bool LogMultiJoinOrder = false; /* print join order as a debugging aid */
bool EnableSingleHashRepartitioning = false;
/* Function pointer type definition for join rule evaluation functions */
typedef JoinOrderNode *(*RuleEvalFunction) (JoinOrderNode *currentJoinNode,
@ -178,7 +179,7 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList)
nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode,
nextTable, joinClauseList, joinType);
if (nextJoinNode->joinRuleType >= SINGLE_PARTITION_JOIN)
if (nextJoinNode->joinRuleType >= SINGLE_HASH_PARTITION_JOIN)
{
/* re-partitioning for OUTER joins is not implemented */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -669,7 +670,8 @@ LargeDataTransferLocation(List *joinOrder)
JoinRuleType joinRuleType = joinOrderNode->joinRuleType;
/* we consider the following join rules to cause large data transfers */
if (joinRuleType == SINGLE_PARTITION_JOIN ||
if (joinRuleType == SINGLE_HASH_PARTITION_JOIN ||
joinRuleType == SINGLE_RANGE_PARTITION_JOIN ||
joinRuleType == DUAL_PARTITION_JOIN ||
joinRuleType == CARTESIAN_PRODUCT)
{
@ -862,7 +864,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType)
{
RuleEvalFunctionArray[REFERENCE_JOIN] = &ReferenceJoin;
RuleEvalFunctionArray[LOCAL_PARTITION_JOIN] = &LocalJoin;
RuleEvalFunctionArray[SINGLE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct;
@ -888,7 +891,10 @@ JoinRuleName(JoinRuleType ruleType)
/* use strdup() to be independent of memory contexts */
RuleNameArray[REFERENCE_JOIN] = strdup("reference join");
RuleNameArray[LOCAL_PARTITION_JOIN] = strdup("local partition join");
RuleNameArray[SINGLE_PARTITION_JOIN] = strdup("single partition join");
RuleNameArray[SINGLE_HASH_PARTITION_JOIN] =
strdup("single hash partition join");
RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] =
strdup("single range partition join");
RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join");
RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product");
@ -1043,6 +1049,9 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
Var *currentPartitionColumn = currentJoinNode->partitionColumn;
char currentPartitionMethod = currentJoinNode->partitionMethod;
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType;
OpExpr *joinClause = NULL;
Oid relationId = candidateTable->relationId;
uint32 tableId = candidateTable->rangeTableId;
@ -1056,22 +1065,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
/*
* If we previously dual-hash re-partitioned the tables for a join, we
* currently don't allow a single-repartition join.
* If we previously dual-hash re-partitioned the tables for a join or made
* cartesian product, we currently don't allow a single-repartition join.
*/
if (currentPartitionMethod == REDISTRIBUTE_BY_HASH)
if (currentJoinRuleType == DUAL_PARTITION_JOIN ||
currentJoinRuleType == CARTESIAN_PRODUCT)
{
return NULL;
}
if (currentPartitionMethod != DISTRIBUTE_BY_HASH)
{
OpExpr *joinClause = SinglePartitionJoinClause(currentPartitionColumn,
applicableJoinClauses);
joinClause =
SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses);
if (joinClause != NULL)
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN,
if (currentPartitionMethod == DISTRIBUTE_BY_HASH)
{
/*
* Single hash repartitioning may perform worse than dual hash
* repartitioning. Thus, we control it via a guc.
*/
if (!EnableSingleHashRepartitioning)
{
return NULL;
}
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionMethod,
currentAnchorTable);
}
else
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionMethod,
currentAnchorTable);
@ -1079,19 +1104,39 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
/* evaluate re-partitioning the current table only if the rule didn't apply above */
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH &&
candidatePartitionMethod != DISTRIBUTE_BY_NONE)
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_NONE)
{
OpExpr *joinClause = SinglePartitionJoinClause(candidatePartitionColumn,
applicableJoinClauses);
if (joinClause != NULL)
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN,
if (candidatePartitionMethod == DISTRIBUTE_BY_HASH)
{
/*
* Single hash repartitioning may perform worse than dual hash
* repartitioning. Thus, we control it via a guc.
*/
if (!EnableSingleHashRepartitioning)
{
return NULL;
}
nextJoinNode = MakeJoinOrderNode(candidateTable,
SINGLE_HASH_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionMethod,
candidateTable);
}
else
{
nextJoinNode = MakeJoinOrderNode(candidateTable,
SINGLE_RANGE_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionMethod,
candidateTable);
}
}
}
return nextJoinNode;

View File

@ -94,7 +94,15 @@ static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
static MultiNode * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses);
static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses);
static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
@ -2208,7 +2216,10 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
{
RuleApplyFunctionArray[REFERENCE_JOIN] = &ApplyReferenceJoin;
RuleApplyFunctionArray[LOCAL_PARTITION_JOIN] = &ApplyLocalJoin;
RuleApplyFunctionArray[SINGLE_PARTITION_JOIN] = &ApplySinglePartitionJoin;
RuleApplyFunctionArray[SINGLE_HASH_PARTITION_JOIN] =
&ApplySingleHashPartitionJoin;
RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] =
&ApplySingleRangePartitionJoin;
RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct;
@ -2264,12 +2275,50 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
}
/*
* ApplySingleRangePartitionJoin is a wrapper around ApplySinglePartitionJoin()
* which sets the joinRuleType properly.
*/
static MultiNode *
ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN;
return (MultiNode *) joinNode;
}
/*
* ApplySingleHashPartitionJoin is a wrapper around ApplySinglePartitionJoin()
* which sets the joinRuleType properly.
*/
static MultiNode *
ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN;
return (MultiNode *) joinNode;
}
/*
* ApplySinglePartitionJoin creates a new MultiJoin node that joins the left and
* right node. The function also adds a MultiPartition node on top of the node
* (left or right) that is not partitioned on the join column.
*/
static MultiNode *
static MultiJoin *
ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
@ -2337,11 +2386,10 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
}
/* finally set join operator fields */
joinNode->joinRuleType = SINGLE_PARTITION_JOIN;
joinNode->joinType = joinType;
joinNode->joinClauseList = applicableJoinClauses;
return (MultiNode *) joinNode;
return joinNode;
}

View File

@ -14,6 +14,7 @@
#include "postgres.h"
#include <math.h>
#include <stdint.h>
#include "miscadmin.h"
@ -320,7 +321,8 @@ BuildJobTree(MultiTreeRoot *multiTree)
if (currentNodeType == T_MultiJoin)
{
MultiJoin *joinNode = (MultiJoin *) currentNode;
if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN ||
if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN ||
joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN ||
joinNode->joinRuleType == DUAL_PARTITION_JOIN)
{
boundaryNodeJobType = JOIN_MAP_MERGE_JOB;
@ -350,14 +352,19 @@ BuildJobTree(MultiTreeRoot *multiTree)
PartitionType partitionType = PARTITION_INVALID_FIRST;
Oid baseRelationId = InvalidOid;
if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN)
if (joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN)
{
partitionType = RANGE_PARTITION_TYPE;
baseRelationId = RangePartitionJoinBaseRelationId(joinNode);
}
else if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN)
{
partitionType = SINGLE_HASH_PARTITION_TYPE;
baseRelationId = RangePartitionJoinBaseRelationId(joinNode);
}
else if (joinNode->joinRuleType == DUAL_PARTITION_JOIN)
{
partitionType = HASH_PARTITION_TYPE;
partitionType = DUAL_HASH_PARTITION_TYPE;
}
if (CitusIsA(leftChildNode, MultiPartition))
@ -411,7 +418,8 @@ BuildJobTree(MultiTreeRoot *multiTree)
Query *jobQuery = BuildJobQuery(queryNode, dependedJobList);
MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList,
partitionKey, HASH_PARTITION_TYPE,
partitionKey,
DUAL_HASH_PARTITION_TYPE,
InvalidOid,
SUBQUERY_MAP_MERGE_JOB);
@ -1849,21 +1857,20 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
* If join type is not set, this means this job represents a subquery, and
* uses hash partitioning.
*/
if (partitionType == HASH_PARTITION_TYPE)
if (partitionType == DUAL_HASH_PARTITION_TYPE)
{
uint32 partitionCount = HashPartitionCount();
mapMergeJob->partitionType = HASH_PARTITION_TYPE;
mapMergeJob->partitionType = DUAL_HASH_PARTITION_TYPE;
mapMergeJob->partitionCount = partitionCount;
}
else if (partitionType == RANGE_PARTITION_TYPE)
else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType ==
RANGE_PARTITION_TYPE)
{
/* build the split point object for the table on the right-hand side */
DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId);
bool hasUninitializedShardInterval = false;
uint32 shardCount = cache->shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray;
char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = 0;
hasUninitializedShardInterval = cache->hasUninitializedShardInterval;
if (hasUninitializedShardInterval)
@ -1872,12 +1879,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
"missing min/max values")));
}
basePartitionMethod = PartitionMethod(baseRelationId);
/* this join-type currently doesn't work for hash partitioned tables */
Assert(basePartitionMethod != DISTRIBUTE_BY_HASH);
mapMergeJob->partitionType = RANGE_PARTITION_TYPE;
mapMergeJob->partitionType = partitionType;
mapMergeJob->partitionCount = shardCount;
mapMergeJob->sortedShardIntervalArray = sortedShardIntervalArray;
mapMergeJob->sortedShardIntervalArrayLength = shardCount;
@ -2729,7 +2731,7 @@ DependsOnHashPartitionJob(Job *job)
if (CitusIsA(dependedJob, MapMergeJob))
{
MapMergeJob *mapMergeJob = (MapMergeJob *) dependedJob;
if (mapMergeJob->partitionType == HASH_PARTITION_TYPE)
if (mapMergeJob->partitionType == DUAL_HASH_PARTITION_TYPE)
{
dependsOnHashPartitionJob = true;
}
@ -3758,6 +3760,7 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment
Task *leftMergeTask = (Task *) leftFragment->fragmentReference;
Task *rightMergeTask = (Task *) rightFragment->fragmentReference;
if (leftMergeTask->partitionId != rightMergeTask->partitionId)
{
ereport(DEBUG2, (errmsg("join prunable for task partitionId %u and %u",
@ -3771,8 +3774,9 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment
}
}
/*
* We have a range (re)partition join. We now get shard intervals for both
* We have a single (re)partition join. We now get shard intervals for both
* fragments, and then check if these intervals overlap.
*/
leftFragmentInterval = FragmentInterval(leftFragment);
@ -4261,13 +4265,32 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, splitPointString->data);
}
else if (partitionType == SINGLE_HASH_PARTITION_TYPE)
{
ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray;
uint32 intervalCount = mapMergeJob->partitionCount;
ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount);
StringInfo splitPointString = SplitPointArrayString(splitPointObject,
partitionColumnType,
partitionColumnTypeMod);
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, splitPointString->data);
}
else
{
uint32 partitionCount = mapMergeJob->partitionCount;
ShardInterval **intervalArray =
GenerateSyntheticShardIntervalArray(partitionCount);
ArrayType *splitPointObject = SplitPointObject(intervalArray,
mapMergeJob->partitionCount);
StringInfo splitPointString =
SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID));
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, partitionCount);
partitionColumnTypeFullName, splitPointString->data);
}
/* convert filter query task into map task */
@ -4282,6 +4305,46 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
}
/*
* GenerateSyntheticShardIntervalArray returns a shard interval pointer array
* which has a uniform hash distribution for the given input partitionCount.
*
* The function only fills the min/max values of shard the intervals. Thus, should
* not be used for general purpose operations.
*/
ShardInterval **
GenerateSyntheticShardIntervalArray(int partitionCount)
{
ShardInterval **shardIntervalArray = palloc0(partitionCount *
sizeof(ShardInterval *));
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount;
int shardIndex = 0;
for (shardIndex = 0; shardIndex < partitionCount; ++shardIndex)
{
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
/* calculate the split of the hash space */
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
shardInterval->relationId = InvalidOid;
shardInterval->minValueExists = true;
shardInterval->minValue = Int32GetDatum(shardMinHashToken);
shardInterval->maxValueExists = true;
shardInterval->maxValue = Int32GetDatum(shardMaxHashToken);
shardInterval->shardId = INVALID_SHARD_ID;
shardInterval->valueTypeId = INT4OID;
shardIntervalArray[shardIndex] = shardInterval;
}
return shardIntervalArray;
}
/*
* ColumnName resolves the given column's name. The given column could belong to
* a regular table or to an intermediate table formed to execute a distributed
@ -4396,6 +4459,10 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
initialPartitionId = 1;
partitionCount = partitionCount + 1;
}
else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE)
{
initialPartitionId = 0;
}
/* build merge tasks and their associated "map output fetch" tasks */
for (partitionId = initialPartitionId; partitionId < partitionCount; partitionId++)
@ -4463,7 +4530,7 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
/* merge task depends on completion of fetch tasks */
mergeTask->dependedTaskList = mapOutputFetchTaskList;
/* if range repartitioned, each merge task represents an interval */
/* if single repartitioned, each merge task represents an interval */
if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE)
{
int32 mergeTaskIntervalId = partitionId - 1;
@ -4472,6 +4539,14 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId];
}
else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE)
{
int32 mergeTaskIntervalId = partitionId;
ShardInterval **mergeTaskIntervals = mapMergeJob->sortedShardIntervalArray;
Assert(mergeTaskIntervalId >= 0);
mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId];
}
mergeTaskList = lappend(mergeTaskList, mergeTask);
}

View File

@ -373,6 +373,17 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_single_hash_repartition_joins",
gettext_noop("Enables single hash repartitioning between hash "
"distributed tables"),
NULL,
&EnableSingleHashRepartitioning,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Enables supported subquery pushdown to workers."),

View File

@ -168,8 +168,6 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static void PrepareWorkerNodeCache(void);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
@ -1181,7 +1179,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
* has a uniform hash distribution, as produced by master_create_worker_shards for
* hash partitioned tables.
*/
static bool
bool
HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength)
{

View File

@ -25,11 +25,6 @@
#include "utils/memutils.h"
static int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
/*
* LowestShardIntervalById returns the shard interval with the lowest shard
* ID from a list of shard intervals.
@ -332,8 +327,17 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
* matching a given partition column value and returns it's index in the cached
* array. If it can not find any shard interval with the given value, it returns
* INVALID_SHARD_INDEX.
*
* TODO: Data re-partitioning logic (e.g., worker_hash_partition_table())
* on the worker nodes relies on this function in order to be consistent
* with shard pruning. Since the worker nodes don't have the metadata, a
* synthetically generated ShardInterval ** is passed to the to this
* function. The synthetic shard intervals contain only shardmin and shardmax
* values. A proper implementation of this approach should be introducing an
* intermediate data structure (e.g., ShardRange) on which this function
* operates instead of operating shard intervals.
*/
static int
int
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction)
{

View File

@ -23,15 +23,18 @@
#include <sys/stat.h>
#include <math.h>
#include <unistd.h>
#include <stdint.h>
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
@ -53,6 +56,9 @@ static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */
/* Local functions forward declarations */
static ShardInterval ** SyntheticShardIntervalArrayForShardMinValues(
Datum *shardMinValues,
int shardCount);
static StringInfo InitTaskAttemptDirectory(uint64 jobId, uint32 taskId);
static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount);
static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount);
@ -73,6 +79,7 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static uint32 RangePartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
static bool FileIsLink(char *filename, struct stat filestat);
@ -178,28 +185,88 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
text *filterQueryText = PG_GETARG_TEXT_P(2);
text *partitionColumnText = PG_GETARG_TEXT_P(3);
Oid partitionColumnType = PG_GETARG_OID(4);
uint32 partitionCount = PG_GETARG_UINT32(5);
ArrayType *hashRangeObject = NULL;
const char *filterQuery = text_to_cstring(filterQueryText);
const char *partitionColumn = text_to_cstring(partitionColumnText);
HashPartitionContext *partitionContext = NULL;
FmgrInfo *hashFunction = NULL;
Datum *hashRangeArray = NULL;
int32 partitionCount = 0;
StringInfo taskDirectory = NULL;
StringInfo taskAttemptDirectory = NULL;
FileOutputStream *partitionFileArray = NULL;
uint32 fileCount = partitionCount;
uint32 fileCount = 0;
uint32 (*HashPartitionIdFunction)(Datum, const void *);
Oid partitionBucketOid = InvalidOid;
CheckCitusVersion(ERROR);
partitionContext = palloc0(sizeof(HashPartitionContext));
/*
* We do this hack for backward compatibility.
*
* In the older versions of Citus, worker_hash_partition_table()'s 6th parameter
* was an integer which denoted the number of buckets to split the shard's data.
* In the later versions of Citus, the sixth parameter is changed to get an array
* of shard ranges, which is used as the ranges to split the shard's data.
*
* Keeping this value is important if the coordinator's Citus version is <= 7.3
* and worker Citus version is > 7.3.
*/
partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5);
if (partitionBucketOid == INT4ARRAYOID)
{
hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
hashRangeArray = DeconstructArrayObject(hashRangeObject);
partitionCount = ArrayObjectCount(hashRangeObject);
partitionContext->syntheticShardIntervalArray =
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
partitionContext->hasUniformHashDistribution =
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
partitionCount);
HashPartitionIdFunction = &HashPartitionId;
}
else if (partitionBucketOid == INT4OID)
{
partitionCount = PG_GETARG_UINT32(5);
partitionContext->syntheticShardIntervalArray =
GenerateSyntheticShardIntervalArray(partitionCount);
partitionContext->hasUniformHashDistribution = true;
HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI;
}
else
{
/* we should never get other type of parameters */
ereport(ERROR, (errmsg("unexpected parameter for "
"worker_hash_partition_table()")));
}
/* use column's type information to get the hashing function */
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC);
/* create hash partition context object */
partitionContext = palloc0(sizeof(HashPartitionContext));
/* we create as many files as the number of split points */
fileCount = partitionCount;
partitionContext->hashFunction = hashFunction;
partitionContext->partitionCount = partitionCount;
/* we'll use binary search, we need the comparison function */
if (!partitionContext->hasUniformHashDistribution)
{
partitionContext->comparisonFunction =
GetFunctionInfo(partitionColumnType, BTREE_AM_OID, BTORDER_PROC);
}
/* init directories and files to write the partitioned data to */
taskDirectory = InitTaskDirectory(jobId, taskId);
taskAttemptDirectory = InitTaskAttemptDirectory(jobId, taskId);
@ -209,7 +276,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
/* call the partitioning function that does the actual work */
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType,
&HashPartitionId, (const void *) partitionContext,
HashPartitionIdFunction, (const void *) partitionContext,
partitionFileArray, fileCount);
/* close partition files and atomically rename (commit) them */
@ -221,6 +288,39 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
}
/*
* SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array
* which gets the shardMinValues from the input shardMinValues array. Note that
* we simply calculate shard max values by decrementing previous shard min values by one.
*
* The function only fills the min/max values of shard the intervals. Thus, should
* not be used for general purpose operations.
*/
static ShardInterval **
SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount)
{
int shardIndex = 0;
Datum nextShardMaxValue = Int32GetDatum(INT32_MAX);
ShardInterval **syntheticShardIntervalArray =
palloc(sizeof(ShardInterval *) * shardCount);
for (shardIndex = shardCount - 1; shardIndex >= 0; --shardIndex)
{
Datum currentShardMinValue = shardMinValues[shardIndex];
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
shardInterval->minValue = currentShardMinValue;
shardInterval->maxValue = nextShardMaxValue;
nextShardMaxValue = Int32GetDatum(DatumGetInt32(currentShardMinValue) - 1);
syntheticShardIntervalArray[shardIndex] = shardInterval;
}
return syntheticShardIntervalArray;
}
/*
* GetFunctionInfo first resolves the operator for the given data type, access
* method, and support procedure. The function then uses the resolved operator's
@ -1156,6 +1256,52 @@ RangePartitionId(Datum partitionValue, const void *context)
/*
* HashPartitionId determines the partition number for the given data value
* using hash partitioning. More specifically, the function returns zero if the
* given data value is null. If not, the function follows the exact same approach
* as Citus distributed planner uses.
*/
static uint32
HashPartitionId(Datum partitionValue, const void *context)
{
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;
uint32 partitionCount = hashPartitionContext->partitionCount;
ShardInterval **syntheticShardIntervalArray =
hashPartitionContext->syntheticShardIntervalArray;
FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction;
Datum hashDatum = FunctionCall1(hashFunction, partitionValue);
int32 hashResult = 0;
uint32 hashPartitionId = 0;
if (hashDatum == 0)
{
return hashPartitionId;
}
if (hashPartitionContext->hasUniformHashDistribution)
{
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount;
hashResult = DatumGetInt32(hashDatum);
hashPartitionId = (uint32) (hashResult - INT32_MIN) / hashTokenIncrement;
}
else
{
hashPartitionId =
SearchCachedShardInterval(hashDatum, syntheticShardIntervalArray,
partitionCount, comparisonFunction);
}
return hashPartitionId;
}
/*
* HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility
* between the Citus versions 7.4 and older versions.
*
* HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value
* using hash partitioning. More specifically, the function returns zero if the
* given data value is null. If not, the function applies the standard Postgres
* hashing function for the given data type, and mods the hashed result with the
* number of partitions. The function then returns the modded number as the
@ -1166,7 +1312,7 @@ RangePartitionId(Datum partitionValue, const void *context)
* see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4."
*/
static uint32
HashPartitionId(Datum partitionValue, const void *context)
HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context)
{
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;

View File

@ -90,6 +90,8 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void FlushDistTableCache(void);
extern void InvalidateMetadataSystemCache(void);
extern Datum DistNodeMetadata(void);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);

View File

@ -30,9 +30,10 @@ typedef enum JoinRuleType
JOIN_RULE_INVALID_FIRST = 0,
REFERENCE_JOIN = 1,
LOCAL_PARTITION_JOIN = 2,
SINGLE_PARTITION_JOIN = 3,
DUAL_PARTITION_JOIN = 4,
CARTESIAN_PRODUCT = 5,
SINGLE_HASH_PARTITION_JOIN = 3,
SINGLE_RANGE_PARTITION_JOIN = 4,
DUAL_PARTITION_JOIN = 5,
CARTESIAN_PRODUCT = 6,
/*
* Add new join rule types above this comment. After adding, you must also
@ -75,6 +76,7 @@ typedef struct JoinOrderNode
/* Config variables managed via guc.c */
extern bool LogMultiJoinOrder;
extern bool EnableSingleHashRepartitioning;
/* Function declaration for determining table join orders */

View File

@ -38,7 +38,7 @@
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
#define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %d)"
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
#define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \
(" UINT64_FORMAT ", %d, '%s', '%s')"
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
@ -69,7 +69,8 @@ typedef enum
{
PARTITION_INVALID_FIRST = 0,
RANGE_PARTITION_TYPE = 1,
HASH_PARTITION_TYPE = 2
SINGLE_HASH_PARTITION_TYPE = 2,
DUAL_HASH_PARTITION_TYPE = 3
} PartitionType;
@ -323,6 +324,8 @@ extern int CompareShardPlacements(const void *leftElement, const void *rightElem
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);
extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount);
/* function declarations for Task and Task list operations */
extern bool TasksEqual(const Task *a, const Task *b);

View File

@ -36,6 +36,10 @@ extern int ShardIndex(ShardInterval *shardInterval);
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
DistTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
extern int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
extern bool SingleReplicatedTable(Oid relationId);
#endif /* SHARDINTERVAL_UTILS_H_ */

View File

@ -15,6 +15,7 @@
#define WORKER_PROTOCOL_H
#include "fmgr.h"
#include "distributed/shardinterval_utils.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "storage/fd.h"
@ -73,7 +74,10 @@ typedef struct RangePartitionContext
typedef struct HashPartitionContext
{
FmgrInfo *hashFunction;
FmgrInfo *comparisonFunction;
ShardInterval **syntheticShardIntervalArray;
uint32 partitionCount;
bool hasUniformHashDistribution;
} HashPartitionContext;

View File

@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2';
ALTER EXTENSION citus UPDATE TO '7.2-3';
ALTER EXTENSION citus UPDATE TO '7.3-3';
ALTER EXTENSION citus UPDATE TO '7.4-1';
ALTER EXTENSION citus UPDATE TO '7.4-2';
ALTER EXTENSION citus UPDATE TO '7.4-3';
-- show running version
SHOW citus.version;
citus.version

View File

@ -103,7 +103,7 @@ ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
EXPLAIN SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ]
LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
@ -161,7 +161,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
-- range partitioned one.
EXPLAIN SELECT count(*) FROM orders_hash, customer_append
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders_hash" ][ single partition join "customer_append" ]
LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)

View File

@ -51,7 +51,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ]
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ]
QUERY PLAN
------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
@ -94,7 +94,7 @@ GROUP BY
c_comment
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ][ reference join "nation" ]
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ]
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
@ -135,7 +135,7 @@ WHERE
AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK')
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ single partition join "part_append" ]
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
@ -154,7 +154,7 @@ WHERE
c_custkey = o_custkey
GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part_append" ][ single partition join "customer_append" ]
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ]
QUERY PLAN
--------------------------------------------------------------------------
HashAggregate (cost=0.00..0.00 rows=0 width=0)

View File

@ -967,14 +967,15 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
WHERE a.id = b.id AND a.author_id = 1
ORDER BY 1 DESC;
id | author_id | title | word_count | id | author_id | title | word_count
----+-----------+--------------+------------+----+-----------+--------------+------------
41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814
11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347
31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271
1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572
21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890
11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347
1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572
(5 rows)
SET citus.enable_repartition_joins TO OFF;
@ -1779,15 +1780,15 @@ DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 9
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 15
DETAIL: Creating dependency on merge taskId 8
ERROR: the query contains a join that requires repartitioning

View File

@ -41,7 +41,7 @@ SET citus.shard_replication_factor to 1;
-- for interval [8,10] repartition join logic will be triggered.
SET citus.enable_repartition_joins to ON;
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ]
LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ]
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
count

View File

@ -0,0 +1,465 @@
--
-- MULTI_SINGLE_HASH_REPARTITION_JOIN
--
CREATE SCHEMA single_hash_repartition;
SET search_path TO 'single_hash_repartition';
SET citus.enable_single_hash_repartition_joins TO ON;
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
CREATE TABLE ref_table (id int, sum int, avg float);
SET citus.shard_count TO 4;
SELECT create_distributed_table('single_hash_repartition_first', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('single_hash_repartition_second', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_reference_table('ref_table');
create_reference_table
------------------------
(1 row)
SET citus.log_multi_join_order TO ON;
SET client_min_messages TO DEBUG2;
-- a very basic single hash re-partitioning example
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- the same query with the orders of the tables have changed
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
t2.sum = t1.id;
LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- single hash repartition after bcast joins
EXPLAIN SELECT
count(*)
FROM
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
r1.id = t1.id AND t2.sum = t1.id;
LOG: join order: [ "single_hash_repartition_second" ][ reference join "ref_table" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- a more complicated join order, first colocated join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- a more complicated join order, first hash-repartition join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.sum = t2.sum AND t1.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ dual partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 24
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- single hash repartitioning is not supported between different column types
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.avg = t3.id;
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ cartesian product "single_hash_repartition_second" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
-- single repartition query in CTE
-- should work fine
EXPLAIN WITH cte1 AS
(
SELECT
t1.id * t2.avg as data
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum
AND t1.sum > 5000
ORDER BY 1 DESC
LIMIT 50
)
SELECT
count(*)
FROM
cte1, single_hash_repartition_first
WHERE
cte1.data > single_hash_repartition_first.id;
DEBUG: generating subplan 7_1 for CTE cte1: SELECT ((t1.id)::double precision * t2.avg) AS data FROM single_hash_repartition.single_hash_repartition_first t1, single_hash_repartition.single_hash_repartition_second t2 WHERE ((t1.id = t2.sum) AND (t1.sum > 5000)) ORDER BY ((t1.id)::double precision * t2.avg) DESC LIMIT 50
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: push down of limit count: 50
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- two single repartitions
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 24
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- two single repartitions again, but this
-- time the columns of the second join is reverted
EXPLAIN SELECT
avg(t1.avg + t2.avg)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.id = t3.sum
LIMIT 10;
LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: push down of limit count: 10
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 24
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- lets try one more thing, show that non uniform shard distribution doesn't
-- end up with single hash repartitioning
UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass);
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [-1073741824,-2]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [0,1073741822]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [1073741824,2147483646]
DEBUG: join prunable for intervals [-1073741824,-2] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [-1073741824,-2] and [0,1073741822]
DEBUG: join prunable for intervals [-1073741824,-2] and [1073741824,2147483646]
DEBUG: join prunable for intervals [0,1073741822] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [0,1073741822] and [-1073741824,-2]
DEBUG: join prunable for intervals [0,1073741822] and [1073741824,2147483646]
DEBUG: join prunable for intervals [1073741824,2147483646] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [1073741824,2147483646] and [-1073741824,-2]
DEBUG: join prunable for intervals [1073741824,2147483646] and [0,1073741822]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.sum = t2.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
RESET client_min_messages;
RESET search_path;
DROP SCHEMA single_hash_repartition CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table single_hash_repartition.single_hash_repartition_first
drop cascades to table single_hash_repartition.single_hash_repartition_second
drop cascades to table single_hash_repartition.ref_table
SET citus.enable_single_hash_repartition_joins TO OFF;

View File

@ -46,7 +46,7 @@ ERROR: partition column type 20 and split point type 25 do not match
-- Check that we fail with bad partition column type on hash partitioning
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Name, :Bad_Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
ERROR: partition column types 25 and 20 do not match
-- Now, partition table data using valid arguments
SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,

View File

@ -7,15 +7,11 @@
\set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type '\'int8\''
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Query_Text '\'SELECT * FROM lineitem\''
\set Select_All 'SELECT *'
-- Hash functions internally return unsigned 32-bit integers. However, when
-- called externally, the return value becomes a signed 32-bit integer. We hack
-- around this conversion issue by bitwise-anding the hash results. Note that
-- this only works because we are modding with 4. The proper Hash_Mod_Function
-- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4)
-- else ((hashint8(l_orderkey) + 4294967296) % 4) end).
\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )'
-- Hash functions is mapped to exactly behave as Citus planner does
\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_part_00
\set Table_Part_01 lineitem_hash_part_01
\set Table_Part_02 lineitem_hash_part_02
@ -23,7 +19,7 @@
-- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -36,13 +32,25 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
count
-------
3081
2885
(1 row)
SELECT COUNT(*) FROM :Table_Part_01;
count
-------
3009
(1 row)
SELECT COUNT(*) FROM :Table_Part_02;
count
-------
3104
(1 row)
SELECT COUNT(*) FROM :Table_Part_03;
count
-------
2935
3002
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -7,9 +7,10 @@
\set Partition_Column_Text '\'l_partkey\''
\set Partition_Column_Type 23
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment'
\set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08'
\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_complex_part_00
\set Table_Part_01 lineitem_hash_complex_part_01
\set Table_Part_02 lineitem_hash_complex_part_02
@ -22,7 +23,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
' WHERE l_shipdate >= date ''1992-01-15'''
' AND l_discount between 0.02 AND 0.08',
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -36,13 +37,13 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
count
-------
1988
1883
(1 row)
SELECT COUNT(*) FROM :Table_Part_03;
count
-------
1881
1913
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -98,14 +98,15 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
-- into the 0th repartition bucket.
\set Hash_TaskId 101107
\set Partition_Count 4
\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set hashTokenIncrement 1073741824
\set Hash_Table_Part_00 supplier_hash_part_00
\set Hash_Table_Part_01 supplier_hash_part_01
\set Hash_Table_Part_02 supplier_hash_part_02
-- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -117,13 +118,13 @@ COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_000
SELECT COUNT(*) FROM :Hash_Table_Part_00;
count
-------
298
282
(1 row)
SELECT COUNT(*) FROM :Hash_Table_Part_02;
count
-------
203
102
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -63,7 +63,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_partition_pruning
test: multi_partition_pruning single_hash_repartition_join
test: multi_join_pruning multi_hash_pruning
test: multi_null_minmax_value_pruning
test: multi_query_directory_cleanup

View File

@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2';
ALTER EXTENSION citus UPDATE TO '7.2-3';
ALTER EXTENSION citus UPDATE TO '7.3-3';
ALTER EXTENSION citus UPDATE TO '7.4-1';
ALTER EXTENSION citus UPDATE TO '7.4-2';
ALTER EXTENSION citus UPDATE TO '7.4-3';
-- show running version
SHOW citus.version;

View File

@ -461,7 +461,8 @@ SELECT *
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
WHERE a.id = b.id AND a.author_id = 1
ORDER BY 1 DESC;
SET citus.enable_repartition_joins TO OFF;
-- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor

View File

@ -0,0 +1,143 @@
--
-- MULTI_SINGLE_HASH_REPARTITION_JOIN
--
CREATE SCHEMA single_hash_repartition;
SET search_path TO 'single_hash_repartition';
SET citus.enable_single_hash_repartition_joins TO ON;
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
CREATE TABLE ref_table (id int, sum int, avg float);
SET citus.shard_count TO 4;
SELECT create_distributed_table('single_hash_repartition_first', 'id');
SELECT create_distributed_table('single_hash_repartition_second', 'id');
SELECT create_reference_table('ref_table');
SET citus.log_multi_join_order TO ON;
SET client_min_messages TO DEBUG2;
-- a very basic single hash re-partitioning example
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
-- the same query with the orders of the tables have changed
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
t2.sum = t1.id;
-- single hash repartition after bcast joins
EXPLAIN SELECT
count(*)
FROM
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
r1.id = t1.id AND t2.sum = t1.id;
-- a more complicated join order, first colocated join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.sum = t3.id;
-- a more complicated join order, first hash-repartition join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.sum = t2.sum AND t1.sum = t3.id;
-- single hash repartitioning is not supported between different column types
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.avg = t3.id;
-- single repartition query in CTE
-- should work fine
EXPLAIN WITH cte1 AS
(
SELECT
t1.id * t2.avg as data
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum
AND t1.sum > 5000
ORDER BY 1 DESC
LIMIT 50
)
SELECT
count(*)
FROM
cte1, single_hash_repartition_first
WHERE
cte1.data > single_hash_repartition_first.id;
-- two single repartitions
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.sum = t3.id;
-- two single repartitions again, but this
-- time the columns of the second join is reverted
EXPLAIN SELECT
avg(t1.avg + t2.avg)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.id = t3.sum
LIMIT 10;
-- lets try one more thing, show that non uniform shard distribution doesn't
-- end up with single hash repartitioning
UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass);
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.sum = t2.id;
RESET client_min_messages;
RESET search_path;
DROP SCHEMA single_hash_repartition CASCADE;
SET citus.enable_single_hash_repartition_joins TO OFF;

View File

@ -54,7 +54,7 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Name, :Bad_Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
-- Now, partition table data using valid arguments

View File

@ -10,18 +10,12 @@
\set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type '\'int8\''
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Query_Text '\'SELECT * FROM lineitem\''
\set Select_All 'SELECT *'
-- Hash functions internally return unsigned 32-bit integers. However, when
-- called externally, the return value becomes a signed 32-bit integer. We hack
-- around this conversion issue by bitwise-anding the hash results. Note that
-- this only works because we are modding with 4. The proper Hash_Mod_Function
-- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4)
-- else ((hashint8(l_orderkey) + 4294967296) % 4) end).
\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )'
-- Hash functions is mapped to exactly behave as Citus planner does
\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_part_00
\set Table_Part_01 lineitem_hash_part_01
@ -32,7 +26,7 @@
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000';
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001';
@ -40,6 +34,8 @@ COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002';
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
SELECT COUNT(*) FROM :Table_Part_01;
SELECT COUNT(*) FROM :Table_Part_02;
SELECT COUNT(*) FROM :Table_Part_03;
-- We first compute the difference of partition tables against the base table.

View File

@ -10,11 +10,12 @@
\set Partition_Column_Text '\'l_partkey\''
\set Partition_Column_Type 23
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment'
\set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08'
\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_complex_part_00
\set Table_Part_01 lineitem_hash_complex_part_01
@ -30,7 +31,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
' WHERE l_shipdate >= date ''1992-01-15'''
' AND l_discount between 0.02 AND 0.08',
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
-- Copy partitioned data files into tables for testing purposes

View File

@ -69,7 +69,8 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
\set Hash_TaskId 101107
\set Partition_Count 4
\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set hashTokenIncrement 1073741824
\set Hash_Table_Part_00 supplier_hash_part_00
\set Hash_Table_Part_01 supplier_hash_part_01
@ -79,7 +80,7 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000';
COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';