Implement single repartitioning on hash distributed tables

* Change worker_hash_partition_table() such that the
     divergence between Citus planner's hashing and
     worker_hash_partition_table() becomes the same.

   * Rename single partitioning to single range partitioning.

   * Add single hash repartitioning. Basically, logical planner
     treats single hash and range partitioning almost equally.
     Physical planner, on the other hand, treats single hash and
     dual hash repartitioning almost equally (except for JoinPruning).

   * Add a new GUC to enable this feature
pull/2075/head
Onder Kalaci 2018-03-29 09:44:00 +03:00
parent 7850f93127
commit 317dd02a2f
33 changed files with 1094 additions and 115 deletions

View File

@ -15,7 +15,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.1-1 7.1-2 7.1-3 7.1-4 \
7.2-1 7.2-2 7.2-3 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2
7.4-1 7.4-2 7.4-3
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -197,6 +197,8 @@ $(EXTENSION)--7.4-1.sql: $(EXTENSION)--7.3-3.sql $(EXTENSION)--7.3-3--7.4-1.sql
cat $^ > $@
$(EXTENSION)--7.4-2.sql: $(EXTENSION)--7.4-1.sql $(EXTENSION)--7.4-1--7.4-2.sql
cat $^ > $@
$(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,13 @@
/* citus--7.4-2--7.4-3 */
SET search_path = 'pg_catalog';
-- note that we're not dropping the older version of the function
CREATE FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, anyarray)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_hash_partition_table$$;
COMMENT ON FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid,
anyarray)
IS 'hash partition query results';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.4-2'
default_version = '7.4-3'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -35,6 +35,7 @@
/* Config variables managed via guc.c */
bool LogMultiJoinOrder = false; /* print join order as a debugging aid */
bool EnableSingleHashRepartitioning = false;
/* Function pointer type definition for join rule evaluation functions */
typedef JoinOrderNode *(*RuleEvalFunction) (JoinOrderNode *currentJoinNode,
@ -178,7 +179,7 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList)
nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode,
nextTable, joinClauseList, joinType);
if (nextJoinNode->joinRuleType >= SINGLE_PARTITION_JOIN)
if (nextJoinNode->joinRuleType >= SINGLE_HASH_PARTITION_JOIN)
{
/* re-partitioning for OUTER joins is not implemented */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -669,7 +670,8 @@ LargeDataTransferLocation(List *joinOrder)
JoinRuleType joinRuleType = joinOrderNode->joinRuleType;
/* we consider the following join rules to cause large data transfers */
if (joinRuleType == SINGLE_PARTITION_JOIN ||
if (joinRuleType == SINGLE_HASH_PARTITION_JOIN ||
joinRuleType == SINGLE_RANGE_PARTITION_JOIN ||
joinRuleType == DUAL_PARTITION_JOIN ||
joinRuleType == CARTESIAN_PRODUCT)
{
@ -862,7 +864,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType)
{
RuleEvalFunctionArray[REFERENCE_JOIN] = &ReferenceJoin;
RuleEvalFunctionArray[LOCAL_PARTITION_JOIN] = &LocalJoin;
RuleEvalFunctionArray[SINGLE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct;
@ -888,7 +891,10 @@ JoinRuleName(JoinRuleType ruleType)
/* use strdup() to be independent of memory contexts */
RuleNameArray[REFERENCE_JOIN] = strdup("reference join");
RuleNameArray[LOCAL_PARTITION_JOIN] = strdup("local partition join");
RuleNameArray[SINGLE_PARTITION_JOIN] = strdup("single partition join");
RuleNameArray[SINGLE_HASH_PARTITION_JOIN] =
strdup("single hash partition join");
RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] =
strdup("single range partition join");
RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join");
RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product");
@ -1043,6 +1049,9 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
Var *currentPartitionColumn = currentJoinNode->partitionColumn;
char currentPartitionMethod = currentJoinNode->partitionMethod;
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType;
OpExpr *joinClause = NULL;
Oid relationId = candidateTable->relationId;
uint32 tableId = candidateTable->rangeTableId;
@ -1056,22 +1065,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
/*
* If we previously dual-hash re-partitioned the tables for a join, we
* currently don't allow a single-repartition join.
* If we previously dual-hash re-partitioned the tables for a join or made
* cartesian product, we currently don't allow a single-repartition join.
*/
if (currentPartitionMethod == REDISTRIBUTE_BY_HASH)
if (currentJoinRuleType == DUAL_PARTITION_JOIN ||
currentJoinRuleType == CARTESIAN_PRODUCT)
{
return NULL;
}
if (currentPartitionMethod != DISTRIBUTE_BY_HASH)
joinClause =
SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses);
if (joinClause != NULL)
{
OpExpr *joinClause = SinglePartitionJoinClause(currentPartitionColumn,
applicableJoinClauses);
if (joinClause != NULL)
if (currentPartitionMethod == DISTRIBUTE_BY_HASH)
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN,
/*
* Single hash repartitioning may perform worse than dual hash
* repartitioning. Thus, we control it via a guc.
*/
if (!EnableSingleHashRepartitioning)
{
return NULL;
}
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionMethod,
currentAnchorTable);
}
else
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionMethod,
currentAnchorTable);
@ -1079,18 +1104,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
/* evaluate re-partitioning the current table only if the rule didn't apply above */
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH &&
candidatePartitionMethod != DISTRIBUTE_BY_NONE)
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_NONE)
{
OpExpr *joinClause = SinglePartitionJoinClause(candidatePartitionColumn,
applicableJoinClauses);
if (joinClause != NULL)
{
nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionMethod,
candidateTable);
if (candidatePartitionMethod == DISTRIBUTE_BY_HASH)
{
/*
* Single hash repartitioning may perform worse than dual hash
* repartitioning. Thus, we control it via a guc.
*/
if (!EnableSingleHashRepartitioning)
{
return NULL;
}
nextJoinNode = MakeJoinOrderNode(candidateTable,
SINGLE_HASH_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionMethod,
candidateTable);
}
else
{
nextJoinNode = MakeJoinOrderNode(candidateTable,
SINGLE_RANGE_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionMethod,
candidateTable);
}
}
}

View File

@ -94,7 +94,15 @@ static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
static MultiNode * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses);
static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses);
static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
@ -2208,7 +2216,10 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
{
RuleApplyFunctionArray[REFERENCE_JOIN] = &ApplyReferenceJoin;
RuleApplyFunctionArray[LOCAL_PARTITION_JOIN] = &ApplyLocalJoin;
RuleApplyFunctionArray[SINGLE_PARTITION_JOIN] = &ApplySinglePartitionJoin;
RuleApplyFunctionArray[SINGLE_HASH_PARTITION_JOIN] =
&ApplySingleHashPartitionJoin;
RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] =
&ApplySingleRangePartitionJoin;
RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct;
@ -2264,12 +2275,50 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
}
/*
* ApplySingleRangePartitionJoin is a wrapper around ApplySinglePartitionJoin()
* which sets the joinRuleType properly.
*/
static MultiNode *
ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN;
return (MultiNode *) joinNode;
}
/*
* ApplySingleHashPartitionJoin is a wrapper around ApplySinglePartitionJoin()
* which sets the joinRuleType properly.
*/
static MultiNode *
ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN;
return (MultiNode *) joinNode;
}
/*
* ApplySinglePartitionJoin creates a new MultiJoin node that joins the left and
* right node. The function also adds a MultiPartition node on top of the node
* (left or right) that is not partitioned on the join column.
*/
static MultiNode *
static MultiJoin *
ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
@ -2337,11 +2386,10 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
}
/* finally set join operator fields */
joinNode->joinRuleType = SINGLE_PARTITION_JOIN;
joinNode->joinType = joinType;
joinNode->joinClauseList = applicableJoinClauses;
return (MultiNode *) joinNode;
return joinNode;
}

View File

@ -14,6 +14,7 @@
#include "postgres.h"
#include <math.h>
#include <stdint.h>
#include "miscadmin.h"
@ -320,7 +321,8 @@ BuildJobTree(MultiTreeRoot *multiTree)
if (currentNodeType == T_MultiJoin)
{
MultiJoin *joinNode = (MultiJoin *) currentNode;
if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN ||
if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN ||
joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN ||
joinNode->joinRuleType == DUAL_PARTITION_JOIN)
{
boundaryNodeJobType = JOIN_MAP_MERGE_JOB;
@ -350,14 +352,19 @@ BuildJobTree(MultiTreeRoot *multiTree)
PartitionType partitionType = PARTITION_INVALID_FIRST;
Oid baseRelationId = InvalidOid;
if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN)
if (joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN)
{
partitionType = RANGE_PARTITION_TYPE;
baseRelationId = RangePartitionJoinBaseRelationId(joinNode);
}
else if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN)
{
partitionType = SINGLE_HASH_PARTITION_TYPE;
baseRelationId = RangePartitionJoinBaseRelationId(joinNode);
}
else if (joinNode->joinRuleType == DUAL_PARTITION_JOIN)
{
partitionType = HASH_PARTITION_TYPE;
partitionType = DUAL_HASH_PARTITION_TYPE;
}
if (CitusIsA(leftChildNode, MultiPartition))
@ -411,7 +418,8 @@ BuildJobTree(MultiTreeRoot *multiTree)
Query *jobQuery = BuildJobQuery(queryNode, dependedJobList);
MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList,
partitionKey, HASH_PARTITION_TYPE,
partitionKey,
DUAL_HASH_PARTITION_TYPE,
InvalidOid,
SUBQUERY_MAP_MERGE_JOB);
@ -1849,21 +1857,20 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
* If join type is not set, this means this job represents a subquery, and
* uses hash partitioning.
*/
if (partitionType == HASH_PARTITION_TYPE)
if (partitionType == DUAL_HASH_PARTITION_TYPE)
{
uint32 partitionCount = HashPartitionCount();
mapMergeJob->partitionType = HASH_PARTITION_TYPE;
mapMergeJob->partitionType = DUAL_HASH_PARTITION_TYPE;
mapMergeJob->partitionCount = partitionCount;
}
else if (partitionType == RANGE_PARTITION_TYPE)
else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType ==
RANGE_PARTITION_TYPE)
{
/* build the split point object for the table on the right-hand side */
DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId);
bool hasUninitializedShardInterval = false;
uint32 shardCount = cache->shardIntervalArrayLength;
ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray;
char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = 0;
hasUninitializedShardInterval = cache->hasUninitializedShardInterval;
if (hasUninitializedShardInterval)
@ -1872,12 +1879,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
"missing min/max values")));
}
basePartitionMethod = PartitionMethod(baseRelationId);
/* this join-type currently doesn't work for hash partitioned tables */
Assert(basePartitionMethod != DISTRIBUTE_BY_HASH);
mapMergeJob->partitionType = RANGE_PARTITION_TYPE;
mapMergeJob->partitionType = partitionType;
mapMergeJob->partitionCount = shardCount;
mapMergeJob->sortedShardIntervalArray = sortedShardIntervalArray;
mapMergeJob->sortedShardIntervalArrayLength = shardCount;
@ -2729,7 +2731,7 @@ DependsOnHashPartitionJob(Job *job)
if (CitusIsA(dependedJob, MapMergeJob))
{
MapMergeJob *mapMergeJob = (MapMergeJob *) dependedJob;
if (mapMergeJob->partitionType == HASH_PARTITION_TYPE)
if (mapMergeJob->partitionType == DUAL_HASH_PARTITION_TYPE)
{
dependsOnHashPartitionJob = true;
}
@ -3758,6 +3760,7 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment
Task *leftMergeTask = (Task *) leftFragment->fragmentReference;
Task *rightMergeTask = (Task *) rightFragment->fragmentReference;
if (leftMergeTask->partitionId != rightMergeTask->partitionId)
{
ereport(DEBUG2, (errmsg("join prunable for task partitionId %u and %u",
@ -3771,8 +3774,9 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment
}
}
/*
* We have a range (re)partition join. We now get shard intervals for both
* We have a single (re)partition join. We now get shard intervals for both
* fragments, and then check if these intervals overlap.
*/
leftFragmentInterval = FragmentInterval(leftFragment);
@ -4261,13 +4265,32 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, splitPointString->data);
}
else if (partitionType == SINGLE_HASH_PARTITION_TYPE)
{
ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray;
uint32 intervalCount = mapMergeJob->partitionCount;
ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount);
StringInfo splitPointString = SplitPointArrayString(splitPointObject,
partitionColumnType,
partitionColumnTypeMod);
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, splitPointString->data);
}
else
{
uint32 partitionCount = mapMergeJob->partitionCount;
ShardInterval **intervalArray =
GenerateSyntheticShardIntervalArray(partitionCount);
ArrayType *splitPointObject = SplitPointObject(intervalArray,
mapMergeJob->partitionCount);
StringInfo splitPointString =
SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID));
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName,
partitionColumnTypeFullName, partitionCount);
partitionColumnTypeFullName, splitPointString->data);
}
/* convert filter query task into map task */
@ -4282,6 +4305,46 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
}
/*
* GenerateSyntheticShardIntervalArray returns a shard interval pointer array
* which has a uniform hash distribution for the given input partitionCount.
*
* The function only fills the min/max values of shard the intervals. Thus, should
* not be used for general purpose operations.
*/
ShardInterval **
GenerateSyntheticShardIntervalArray(int partitionCount)
{
ShardInterval **shardIntervalArray = palloc0(partitionCount *
sizeof(ShardInterval *));
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount;
int shardIndex = 0;
for (shardIndex = 0; shardIndex < partitionCount; ++shardIndex)
{
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
/* calculate the split of the hash space */
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
shardInterval->relationId = InvalidOid;
shardInterval->minValueExists = true;
shardInterval->minValue = Int32GetDatum(shardMinHashToken);
shardInterval->maxValueExists = true;
shardInterval->maxValue = Int32GetDatum(shardMaxHashToken);
shardInterval->shardId = INVALID_SHARD_ID;
shardInterval->valueTypeId = INT4OID;
shardIntervalArray[shardIndex] = shardInterval;
}
return shardIntervalArray;
}
/*
* ColumnName resolves the given column's name. The given column could belong to
* a regular table or to an intermediate table formed to execute a distributed
@ -4396,6 +4459,10 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
initialPartitionId = 1;
partitionCount = partitionCount + 1;
}
else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE)
{
initialPartitionId = 0;
}
/* build merge tasks and their associated "map output fetch" tasks */
for (partitionId = initialPartitionId; partitionId < partitionCount; partitionId++)
@ -4463,7 +4530,7 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
/* merge task depends on completion of fetch tasks */
mergeTask->dependedTaskList = mapOutputFetchTaskList;
/* if range repartitioned, each merge task represents an interval */
/* if single repartitioned, each merge task represents an interval */
if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE)
{
int32 mergeTaskIntervalId = partitionId - 1;
@ -4472,6 +4539,14 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId];
}
else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE)
{
int32 mergeTaskIntervalId = partitionId;
ShardInterval **mergeTaskIntervals = mapMergeJob->sortedShardIntervalArray;
Assert(mergeTaskIntervalId >= 0);
mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId];
}
mergeTaskList = lappend(mergeTaskList, mergeTask);
}

View File

@ -373,6 +373,17 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_single_hash_repartition_joins",
gettext_noop("Enables single hash repartitioning between hash "
"distributed tables"),
NULL,
&EnableSingleHashRepartitioning,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Enables supported subquery pushdown to workers."),

View File

@ -168,8 +168,6 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static void PrepareWorkerNodeCache(void);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
@ -1181,7 +1179,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
* has a uniform hash distribution, as produced by master_create_worker_shards for
* hash partitioned tables.
*/
static bool
bool
HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength)
{

View File

@ -25,11 +25,6 @@
#include "utils/memutils.h"
static int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
/*
* LowestShardIntervalById returns the shard interval with the lowest shard
* ID from a list of shard intervals.
@ -332,8 +327,17 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry)
* matching a given partition column value and returns it's index in the cached
* array. If it can not find any shard interval with the given value, it returns
* INVALID_SHARD_INDEX.
*
* TODO: Data re-partitioning logic (e.g., worker_hash_partition_table())
* on the worker nodes relies on this function in order to be consistent
* with shard pruning. Since the worker nodes don't have the metadata, a
* synthetically generated ShardInterval ** is passed to the to this
* function. The synthetic shard intervals contain only shardmin and shardmax
* values. A proper implementation of this approach should be introducing an
* intermediate data structure (e.g., ShardRange) on which this function
* operates instead of operating shard intervals.
*/
static int
int
SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction)
{

View File

@ -23,15 +23,18 @@
#include <sys/stat.h>
#include <math.h>
#include <unistd.h>
#include <stdint.h>
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
@ -53,6 +56,9 @@ static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */
/* Local functions forward declarations */
static ShardInterval ** SyntheticShardIntervalArrayForShardMinValues(
Datum *shardMinValues,
int shardCount);
static StringInfo InitTaskAttemptDirectory(uint64 jobId, uint32 taskId);
static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount);
static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount);
@ -73,6 +79,7 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static uint32 RangePartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionId(Datum partitionValue, const void *context);
static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context);
static bool FileIsLink(char *filename, struct stat filestat);
@ -178,28 +185,88 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
text *filterQueryText = PG_GETARG_TEXT_P(2);
text *partitionColumnText = PG_GETARG_TEXT_P(3);
Oid partitionColumnType = PG_GETARG_OID(4);
uint32 partitionCount = PG_GETARG_UINT32(5);
ArrayType *hashRangeObject = NULL;
const char *filterQuery = text_to_cstring(filterQueryText);
const char *partitionColumn = text_to_cstring(partitionColumnText);
HashPartitionContext *partitionContext = NULL;
FmgrInfo *hashFunction = NULL;
Datum *hashRangeArray = NULL;
int32 partitionCount = 0;
StringInfo taskDirectory = NULL;
StringInfo taskAttemptDirectory = NULL;
FileOutputStream *partitionFileArray = NULL;
uint32 fileCount = partitionCount;
uint32 fileCount = 0;
uint32 (*HashPartitionIdFunction)(Datum, const void *);
Oid partitionBucketOid = InvalidOid;
CheckCitusVersion(ERROR);
partitionContext = palloc0(sizeof(HashPartitionContext));
/*
* We do this hack for backward compatibility.
*
* In the older versions of Citus, worker_hash_partition_table()'s 6th parameter
* was an integer which denoted the number of buckets to split the shard's data.
* In the later versions of Citus, the sixth parameter is changed to get an array
* of shard ranges, which is used as the ranges to split the shard's data.
*
* Keeping this value is important if the coordinator's Citus version is <= 7.3
* and worker Citus version is > 7.3.
*/
partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5);
if (partitionBucketOid == INT4ARRAYOID)
{
hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
hashRangeArray = DeconstructArrayObject(hashRangeObject);
partitionCount = ArrayObjectCount(hashRangeObject);
partitionContext->syntheticShardIntervalArray =
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);
partitionContext->hasUniformHashDistribution =
HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray,
partitionCount);
HashPartitionIdFunction = &HashPartitionId;
}
else if (partitionBucketOid == INT4OID)
{
partitionCount = PG_GETARG_UINT32(5);
partitionContext->syntheticShardIntervalArray =
GenerateSyntheticShardIntervalArray(partitionCount);
partitionContext->hasUniformHashDistribution = true;
HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI;
}
else
{
/* we should never get other type of parameters */
ereport(ERROR, (errmsg("unexpected parameter for "
"worker_hash_partition_table()")));
}
/* use column's type information to get the hashing function */
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC);
/* create hash partition context object */
partitionContext = palloc0(sizeof(HashPartitionContext));
/* we create as many files as the number of split points */
fileCount = partitionCount;
partitionContext->hashFunction = hashFunction;
partitionContext->partitionCount = partitionCount;
/* we'll use binary search, we need the comparison function */
if (!partitionContext->hasUniformHashDistribution)
{
partitionContext->comparisonFunction =
GetFunctionInfo(partitionColumnType, BTREE_AM_OID, BTORDER_PROC);
}
/* init directories and files to write the partitioned data to */
taskDirectory = InitTaskDirectory(jobId, taskId);
taskAttemptDirectory = InitTaskAttemptDirectory(jobId, taskId);
@ -209,7 +276,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
/* call the partitioning function that does the actual work */
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType,
&HashPartitionId, (const void *) partitionContext,
HashPartitionIdFunction, (const void *) partitionContext,
partitionFileArray, fileCount);
/* close partition files and atomically rename (commit) them */
@ -221,6 +288,39 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
}
/*
* SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array
* which gets the shardMinValues from the input shardMinValues array. Note that
* we simply calculate shard max values by decrementing previous shard min values by one.
*
* The function only fills the min/max values of shard the intervals. Thus, should
* not be used for general purpose operations.
*/
static ShardInterval **
SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount)
{
int shardIndex = 0;
Datum nextShardMaxValue = Int32GetDatum(INT32_MAX);
ShardInterval **syntheticShardIntervalArray =
palloc(sizeof(ShardInterval *) * shardCount);
for (shardIndex = shardCount - 1; shardIndex >= 0; --shardIndex)
{
Datum currentShardMinValue = shardMinValues[shardIndex];
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
shardInterval->minValue = currentShardMinValue;
shardInterval->maxValue = nextShardMaxValue;
nextShardMaxValue = Int32GetDatum(DatumGetInt32(currentShardMinValue) - 1);
syntheticShardIntervalArray[shardIndex] = shardInterval;
}
return syntheticShardIntervalArray;
}
/*
* GetFunctionInfo first resolves the operator for the given data type, access
* method, and support procedure. The function then uses the resolved operator's
@ -1156,6 +1256,52 @@ RangePartitionId(Datum partitionValue, const void *context)
/*
* HashPartitionId determines the partition number for the given data value
* using hash partitioning. More specifically, the function returns zero if the
* given data value is null. If not, the function follows the exact same approach
* as Citus distributed planner uses.
*/
static uint32
HashPartitionId(Datum partitionValue, const void *context)
{
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;
uint32 partitionCount = hashPartitionContext->partitionCount;
ShardInterval **syntheticShardIntervalArray =
hashPartitionContext->syntheticShardIntervalArray;
FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction;
Datum hashDatum = FunctionCall1(hashFunction, partitionValue);
int32 hashResult = 0;
uint32 hashPartitionId = 0;
if (hashDatum == 0)
{
return hashPartitionId;
}
if (hashPartitionContext->hasUniformHashDistribution)
{
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount;
hashResult = DatumGetInt32(hashDatum);
hashPartitionId = (uint32) (hashResult - INT32_MIN) / hashTokenIncrement;
}
else
{
hashPartitionId =
SearchCachedShardInterval(hashDatum, syntheticShardIntervalArray,
partitionCount, comparisonFunction);
}
return hashPartitionId;
}
/*
* HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility
* between the Citus versions 7.4 and older versions.
*
* HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value
* using hash partitioning. More specifically, the function returns zero if the
* given data value is null. If not, the function applies the standard Postgres
* hashing function for the given data type, and mods the hashed result with the
* number of partitions. The function then returns the modded number as the
@ -1166,7 +1312,7 @@ RangePartitionId(Datum partitionValue, const void *context)
* see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4."
*/
static uint32
HashPartitionId(Datum partitionValue, const void *context)
HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context)
{
HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context;
FmgrInfo *hashFunction = hashPartitionContext->hashFunction;

View File

@ -90,6 +90,8 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void FlushDistTableCache(void);
extern void InvalidateMetadataSystemCache(void);
extern Datum DistNodeMetadata(void);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);

View File

@ -30,9 +30,10 @@ typedef enum JoinRuleType
JOIN_RULE_INVALID_FIRST = 0,
REFERENCE_JOIN = 1,
LOCAL_PARTITION_JOIN = 2,
SINGLE_PARTITION_JOIN = 3,
DUAL_PARTITION_JOIN = 4,
CARTESIAN_PRODUCT = 5,
SINGLE_HASH_PARTITION_JOIN = 3,
SINGLE_RANGE_PARTITION_JOIN = 4,
DUAL_PARTITION_JOIN = 5,
CARTESIAN_PRODUCT = 6,
/*
* Add new join rule types above this comment. After adding, you must also
@ -75,6 +76,7 @@ typedef struct JoinOrderNode
/* Config variables managed via guc.c */
extern bool LogMultiJoinOrder;
extern bool EnableSingleHashRepartitioning;
/* Function declaration for determining table join orders */

View File

@ -38,7 +38,7 @@
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
#define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %d)"
(" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
#define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \
(" UINT64_FORMAT ", %d, '%s', '%s')"
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
@ -69,7 +69,8 @@ typedef enum
{
PARTITION_INVALID_FIRST = 0,
RANGE_PARTITION_TYPE = 1,
HASH_PARTITION_TYPE = 2
SINGLE_HASH_PARTITION_TYPE = 2,
DUAL_HASH_PARTITION_TYPE = 3
} PartitionType;
@ -323,6 +324,8 @@ extern int CompareShardPlacements(const void *leftElement, const void *rightElem
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);
extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount);
/* function declarations for Task and Task list operations */
extern bool TasksEqual(const Task *a, const Task *b);

View File

@ -36,6 +36,10 @@ extern int ShardIndex(ShardInterval *shardInterval);
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
DistTableCacheEntry *cacheEntry);
extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry);
extern int SearchCachedShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, FmgrInfo *compareFunction);
extern bool SingleReplicatedTable(Oid relationId);
#endif /* SHARDINTERVAL_UTILS_H_ */

View File

@ -15,6 +15,7 @@
#define WORKER_PROTOCOL_H
#include "fmgr.h"
#include "distributed/shardinterval_utils.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "storage/fd.h"
@ -73,7 +74,10 @@ typedef struct RangePartitionContext
typedef struct HashPartitionContext
{
FmgrInfo *hashFunction;
FmgrInfo *comparisonFunction;
ShardInterval **syntheticShardIntervalArray;
uint32 partitionCount;
bool hasUniformHashDistribution;
} HashPartitionContext;

View File

@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2';
ALTER EXTENSION citus UPDATE TO '7.2-3';
ALTER EXTENSION citus UPDATE TO '7.3-3';
ALTER EXTENSION citus UPDATE TO '7.4-1';
ALTER EXTENSION citus UPDATE TO '7.4-2';
ALTER EXTENSION citus UPDATE TO '7.4-3';
-- show running version
SHOW citus.version;
citus.version

View File

@ -103,7 +103,7 @@ ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
EXPLAIN SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ]
LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
@ -161,7 +161,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
-- range partitioned one.
EXPLAIN SELECT count(*) FROM orders_hash, customer_append
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders_hash" ][ single partition join "customer_append" ]
LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)

View File

@ -51,7 +51,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ]
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ]
QUERY PLAN
------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
@ -94,7 +94,7 @@ GROUP BY
c_comment
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ][ reference join "nation" ]
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ]
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
@ -135,7 +135,7 @@ WHERE
AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK')
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ single partition join "part_append" ]
LOG: join order: [ "lineitem" ][ single range partition join "part_append" ]
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
@ -154,7 +154,7 @@ WHERE
c_custkey = o_custkey
GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part_append" ][ single partition join "customer_append" ]
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ]
QUERY PLAN
--------------------------------------------------------------------------
HashAggregate (cost=0.00..0.00 rows=0 width=0)

View File

@ -967,14 +967,15 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
WHERE a.id = b.id AND a.author_id = 1
ORDER BY 1 DESC;
id | author_id | title | word_count | id | author_id | title | word_count
----+-----------+--------------+------------+----+-----------+--------------+------------
41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814
11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347
31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271
1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572
21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890
11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347
1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572
(5 rows)
SET citus.enable_repartition_joins TO OFF;
@ -1779,15 +1780,15 @@ DETAIL: Creating dependency on merge taskId 2
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 4
DEBUG: pruning merge fetch taskId 9
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 6
DEBUG: pruning merge fetch taskId 15
DETAIL: Creating dependency on merge taskId 8
ERROR: the query contains a join that requires repartitioning

View File

@ -41,7 +41,7 @@ SET citus.shard_replication_factor to 1;
-- for interval [8,10] repartition join logic will be triggered.
SET citus.enable_repartition_joins to ON;
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ]
LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ]
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
count

View File

@ -0,0 +1,465 @@
--
-- MULTI_SINGLE_HASH_REPARTITION_JOIN
--
CREATE SCHEMA single_hash_repartition;
SET search_path TO 'single_hash_repartition';
SET citus.enable_single_hash_repartition_joins TO ON;
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
CREATE TABLE ref_table (id int, sum int, avg float);
SET citus.shard_count TO 4;
SELECT create_distributed_table('single_hash_repartition_first', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('single_hash_repartition_second', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_reference_table('ref_table');
create_reference_table
------------------------
(1 row)
SET citus.log_multi_join_order TO ON;
SET client_min_messages TO DEBUG2;
-- a very basic single hash re-partitioning example
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- the same query with the orders of the tables have changed
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
t2.sum = t1.id;
LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- single hash repartition after bcast joins
EXPLAIN SELECT
count(*)
FROM
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
r1.id = t1.id AND t2.sum = t1.id;
LOG: join order: [ "single_hash_repartition_second" ][ reference join "ref_table" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- a more complicated join order, first colocated join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- a more complicated join order, first hash-repartition join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.sum = t2.sum AND t1.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ dual partition join "single_hash_repartition_first" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 24
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- single hash repartitioning is not supported between different column types
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.avg = t3.id;
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
DEBUG: single partition column types do not match
DEBUG: single partition column types do not match
DEBUG: dual partition column types do not match
LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ cartesian product "single_hash_repartition_second" ]
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
-- single repartition query in CTE
-- should work fine
EXPLAIN WITH cte1 AS
(
SELECT
t1.id * t2.avg as data
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum
AND t1.sum > 5000
ORDER BY 1 DESC
LIMIT 50
)
SELECT
count(*)
FROM
cte1, single_hash_repartition_first
WHERE
cte1.data > single_hash_repartition_first.id;
DEBUG: generating subplan 7_1 for CTE cte1: SELECT ((t1.id)::double precision * t2.avg) AS data FROM single_hash_repartition.single_hash_repartition_first t1, single_hash_repartition.single_hash_repartition_second t2 WHERE ((t1.id = t2.sum) AND (t1.sum > 5000)) ORDER BY ((t1.id)::double precision * t2.avg) DESC LIMIT 50
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: push down of limit count: 50
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- two single repartitions
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.sum = t3.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 24
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- two single repartitions again, but this
-- time the columns of the second join is reverted
EXPLAIN SELECT
avg(t1.avg + t2.avg)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.id = t3.sum
LIMIT 10;
LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ]
DEBUG: push down of limit count: 10
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 14
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 24
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- lets try one more thing, show that non uniform shard distribution doesn't
-- end up with single hash repartitioning
UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass);
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [-1073741824,-2]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [0,1073741822]
DEBUG: join prunable for intervals [-2147483648,-1073741826] and [1073741824,2147483646]
DEBUG: join prunable for intervals [-1073741824,-2] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [-1073741824,-2] and [0,1073741822]
DEBUG: join prunable for intervals [-1073741824,-2] and [1073741824,2147483646]
DEBUG: join prunable for intervals [0,1073741822] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [0,1073741822] and [-1073741824,-2]
DEBUG: join prunable for intervals [0,1073741822] and [1073741824,2147483646]
DEBUG: join prunable for intervals [1073741824,2147483646] and [-2147483648,-1073741826]
DEBUG: join prunable for intervals [1073741824,2147483646] and [-1073741824,-2]
DEBUG: join prunable for intervals [1073741824,2147483646] and [0,1073741822]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.sum = t2.id;
LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 3
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 20
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
RESET client_min_messages;
RESET search_path;
DROP SCHEMA single_hash_repartition CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table single_hash_repartition.single_hash_repartition_first
drop cascades to table single_hash_repartition.single_hash_repartition_second
drop cascades to table single_hash_repartition.ref_table
SET citus.enable_single_hash_repartition_joins TO OFF;

View File

@ -46,7 +46,7 @@ ERROR: partition column type 20 and split point type 25 do not match
-- Check that we fail with bad partition column type on hash partitioning
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Name, :Bad_Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
ERROR: partition column types 25 and 20 do not match
-- Now, partition table data using valid arguments
SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,

View File

@ -7,15 +7,11 @@
\set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type '\'int8\''
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Query_Text '\'SELECT * FROM lineitem\''
\set Select_All 'SELECT *'
-- Hash functions internally return unsigned 32-bit integers. However, when
-- called externally, the return value becomes a signed 32-bit integer. We hack
-- around this conversion issue by bitwise-anding the hash results. Note that
-- this only works because we are modding with 4. The proper Hash_Mod_Function
-- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4)
-- else ((hashint8(l_orderkey) + 4294967296) % 4) end).
\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )'
-- Hash functions is mapped to exactly behave as Citus planner does
\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_part_00
\set Table_Part_01 lineitem_hash_part_01
\set Table_Part_02 lineitem_hash_part_02
@ -23,7 +19,7 @@
-- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -36,13 +32,25 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
count
-------
3081
2885
(1 row)
SELECT COUNT(*) FROM :Table_Part_01;
count
-------
3009
(1 row)
SELECT COUNT(*) FROM :Table_Part_02;
count
-------
3104
(1 row)
SELECT COUNT(*) FROM :Table_Part_03;
count
-------
2935
3002
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -7,9 +7,10 @@
\set Partition_Column_Text '\'l_partkey\''
\set Partition_Column_Type 23
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment'
\set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08'
\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_complex_part_00
\set Table_Part_01 lineitem_hash_complex_part_01
\set Table_Part_02 lineitem_hash_complex_part_02
@ -22,7 +23,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
' WHERE l_shipdate >= date ''1992-01-15'''
' AND l_discount between 0.02 AND 0.08',
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -36,13 +37,13 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
count
-------
1988
1883
(1 row)
SELECT COUNT(*) FROM :Table_Part_03;
count
-------
1881
1913
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -98,14 +98,15 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
-- into the 0th repartition bucket.
\set Hash_TaskId 101107
\set Partition_Count 4
\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set hashTokenIncrement 1073741824
\set Hash_Table_Part_00 supplier_hash_part_00
\set Hash_Table_Part_01 supplier_hash_part_01
\set Hash_Table_Part_02 supplier_hash_part_02
-- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
worker_hash_partition_table
-----------------------------
@ -117,13 +118,13 @@ COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_000
SELECT COUNT(*) FROM :Hash_Table_Part_00;
count
-------
298
282
(1 row)
SELECT COUNT(*) FROM :Hash_Table_Part_02;
count
-------
203
102
(1 row)
-- We first compute the difference of partition tables against the base table.

View File

@ -63,7 +63,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_partition_pruning
test: multi_partition_pruning single_hash_repartition_join
test: multi_join_pruning multi_hash_pruning
test: multi_null_minmax_value_pruning
test: multi_query_directory_cleanup

View File

@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2';
ALTER EXTENSION citus UPDATE TO '7.2-3';
ALTER EXTENSION citus UPDATE TO '7.3-3';
ALTER EXTENSION citus UPDATE TO '7.4-1';
ALTER EXTENSION citus UPDATE TO '7.4-2';
ALTER EXTENSION citus UPDATE TO '7.4-3';
-- show running version
SHOW citus.version;

View File

@ -461,7 +461,8 @@ SELECT *
SET citus.enable_repartition_joins TO ON;
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
WHERE a.id = b.id AND a.author_id = 1
ORDER BY 1 DESC;
SET citus.enable_repartition_joins TO OFF;
-- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor

View File

@ -0,0 +1,143 @@
--
-- MULTI_SINGLE_HASH_REPARTITION_JOIN
--
CREATE SCHEMA single_hash_repartition;
SET search_path TO 'single_hash_repartition';
SET citus.enable_single_hash_repartition_joins TO ON;
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
CREATE TABLE ref_table (id int, sum int, avg float);
SET citus.shard_count TO 4;
SELECT create_distributed_table('single_hash_repartition_first', 'id');
SELECT create_distributed_table('single_hash_repartition_second', 'id');
SELECT create_reference_table('ref_table');
SET citus.log_multi_join_order TO ON;
SET client_min_messages TO DEBUG2;
-- a very basic single hash re-partitioning example
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
-- the same query with the orders of the tables have changed
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
t2.sum = t1.id;
-- single hash repartition after bcast joins
EXPLAIN SELECT
count(*)
FROM
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
WHERE
r1.id = t1.id AND t2.sum = t1.id;
-- a more complicated join order, first colocated join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.sum = t3.id;
-- a more complicated join order, first hash-repartition join, later single hash repartition join
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.sum = t2.sum AND t1.sum = t3.id;
-- single hash repartitioning is not supported between different column types
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
WHERE
t1.id = t2.id AND t1.avg = t3.id;
-- single repartition query in CTE
-- should work fine
EXPLAIN WITH cte1 AS
(
SELECT
t1.id * t2.avg as data
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum
AND t1.sum > 5000
ORDER BY 1 DESC
LIMIT 50
)
SELECT
count(*)
FROM
cte1, single_hash_repartition_first
WHERE
cte1.data > single_hash_repartition_first.id;
-- two single repartitions
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.sum = t3.id;
-- two single repartitions again, but this
-- time the columns of the second join is reverted
EXPLAIN SELECT
avg(t1.avg + t2.avg)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3
WHERE
t1.id = t2.sum AND t2.id = t3.sum
LIMIT 10;
-- lets try one more thing, show that non uniform shard distribution doesn't
-- end up with single hash repartitioning
UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass);
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.id = t2.sum;
-- the following queries should also be a single hash repartition queries
-- note that since we've manually updated the metadata without changing the
-- the corresponding data, the results of the query would be wrong
EXPLAIN SELECT
count(*)
FROM
single_hash_repartition_first t1, single_hash_repartition_second t2
WHERE
t1.sum = t2.id;
RESET client_min_messages;
RESET search_path;
DROP SCHEMA single_hash_repartition CASCADE;
SET citus.enable_single_hash_repartition_joins TO OFF;

View File

@ -54,7 +54,7 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text,
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Name, :Bad_Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
-- Now, partition table data using valid arguments

View File

@ -10,18 +10,12 @@
\set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type '\'int8\''
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Query_Text '\'SELECT * FROM lineitem\''
\set Select_All 'SELECT *'
-- Hash functions internally return unsigned 32-bit integers. However, when
-- called externally, the return value becomes a signed 32-bit integer. We hack
-- around this conversion issue by bitwise-anding the hash results. Note that
-- this only works because we are modding with 4. The proper Hash_Mod_Function
-- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4)
-- else ((hashint8(l_orderkey) + 4294967296) % 4) end).
\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )'
-- Hash functions is mapped to exactly behave as Citus planner does
\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_part_00
\set Table_Part_01 lineitem_hash_part_01
@ -32,7 +26,7 @@
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000';
COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001';
@ -40,6 +34,8 @@ COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002';
COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003';
SELECT COUNT(*) FROM :Table_Part_00;
SELECT COUNT(*) FROM :Table_Part_01;
SELECT COUNT(*) FROM :Table_Part_02;
SELECT COUNT(*) FROM :Table_Part_03;
-- We first compute the difference of partition tables against the base table.

View File

@ -10,11 +10,12 @@
\set Partition_Column_Text '\'l_partkey\''
\set Partition_Column_Type 23
\set Partition_Count 4
\set hashTokenIncrement 1073741824
\set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment'
\set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08'
\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set Table_Part_00 lineitem_hash_complex_part_00
\set Table_Part_01 lineitem_hash_complex_part_01
@ -30,7 +31,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId,
' WHERE l_shipdate >= date ''1992-01-15'''
' AND l_discount between 0.02 AND 0.08',
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
-- Copy partitioned data files into tables for testing purposes

View File

@ -69,7 +69,8 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
\set Hash_TaskId 101107
\set Partition_Count 4
\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )'
\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8'
\set hashTokenIncrement 1073741824
\set Hash_Table_Part_00 supplier_hash_part_00
\set Hash_Table_Part_01 supplier_hash_part_01
@ -79,7 +80,7 @@ SELECT COUNT(*) AS diff_rhs_02 FROM (
SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type,
:Partition_Count);
ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000';
COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';