From 317dd02a2f9c244ebbd5b45eeb79c950be693205 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 29 Mar 2018 09:44:00 +0300 Subject: [PATCH] Implement single repartitioning on hash distributed tables * Change worker_hash_partition_table() such that the divergence between Citus planner's hashing and worker_hash_partition_table() becomes the same. * Rename single partitioning to single range partitioning. * Add single hash repartitioning. Basically, logical planner treats single hash and range partitioning almost equally. Physical planner, on the other hand, treats single hash and dual hash repartitioning almost equally (except for JoinPruning). * Add a new GUC to enable this feature --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.4-2--7.4-3.sql | 13 + src/backend/distributed/citus.control | 2 +- .../distributed/planner/multi_join_order.c | 83 +++- .../planner/multi_logical_planner.c | 58 ++- .../planner/multi_physical_planner.c | 113 ++++- src/backend/distributed/shared_library_init.c | 11 + .../distributed/utils/metadata_cache.c | 4 +- .../distributed/utils/shardinterval_utils.c | 16 +- .../worker/worker_partition_protocol.c | 158 +++++- src/include/distributed/metadata_cache.h | 2 + src/include/distributed/multi_join_order.h | 8 +- .../distributed/multi_physical_planner.h | 7 +- src/include/distributed/shardinterval_utils.h | 4 + src/include/distributed/worker_protocol.h | 4 + src/test/regress/expected/multi_extension.out | 2 + .../expected/multi_join_order_additional.out | 4 +- .../multi_join_order_tpch_repartition.out | 8 +- .../regress/expected/multi_router_planner.out | 15 +- .../expected/non_colocated_join_order.out | 2 +- .../expected/single_hash_repartition_join.out | 465 ++++++++++++++++++ .../worker_check_invalid_arguments.out | 2 +- .../expected/worker_hash_partition.out | 28 +- .../worker_hash_partition_complex.out | 9 +- .../expected/worker_null_data_partition.out | 9 +- src/test/regress/multi_schedule | 2 +- src/test/regress/sql/multi_extension.sql | 2 + src/test/regress/sql/multi_router_planner.sql | 3 +- .../sql/single_hash_repartition_join.sql | 143 ++++++ .../sql/worker_check_invalid_arguments.sql | 2 +- .../regress/sql/worker_hash_partition.sql | 16 +- .../sql/worker_hash_partition_complex.sql | 5 +- .../sql/worker_null_data_partition.sql | 5 +- 33 files changed, 1094 insertions(+), 115 deletions(-) create mode 100644 src/backend/distributed/citus--7.4-2--7.4-3.sql create mode 100644 src/test/regress/expected/single_hash_repartition_join.out create mode 100644 src/test/regress/sql/single_hash_repartition_join.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 44f692404..710bead37 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -15,7 +15,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.1-1 7.1-2 7.1-3 7.1-4 \ 7.2-1 7.2-2 7.2-3 \ 7.3-1 7.3-2 7.3-3 \ - 7.4-1 7.4-2 + 7.4-1 7.4-2 7.4-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -197,6 +197,8 @@ $(EXTENSION)--7.4-1.sql: $(EXTENSION)--7.3-3.sql $(EXTENSION)--7.3-3--7.4-1.sql cat $^ > $@ $(EXTENSION)--7.4-2.sql: $(EXTENSION)--7.4-1.sql $(EXTENSION)--7.4-1--7.4-2.sql cat $^ > $@ +$(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.4-2--7.4-3.sql b/src/backend/distributed/citus--7.4-2--7.4-3.sql new file mode 100644 index 000000000..733e5219d --- /dev/null +++ b/src/backend/distributed/citus--7.4-2--7.4-3.sql @@ -0,0 +1,13 @@ +/* citus--7.4-2--7.4-3 */ +SET search_path = 'pg_catalog'; + +-- note that we're not dropping the older version of the function +CREATE FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, anyarray) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_hash_partition_table$$; +COMMENT ON FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, + anyarray) + IS 'hash partition query results'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 5926e7513..71a541d86 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.4-2' +default_version = '7.4-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 36f73b0b6..ee1504a86 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -35,6 +35,7 @@ /* Config variables managed via guc.c */ bool LogMultiJoinOrder = false; /* print join order as a debugging aid */ +bool EnableSingleHashRepartitioning = false; /* Function pointer type definition for join rule evaluation functions */ typedef JoinOrderNode *(*RuleEvalFunction) (JoinOrderNode *currentJoinNode, @@ -178,7 +179,7 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, nextTable, joinClauseList, joinType); - if (nextJoinNode->joinRuleType >= SINGLE_PARTITION_JOIN) + if (nextJoinNode->joinRuleType >= SINGLE_HASH_PARTITION_JOIN) { /* re-partitioning for OUTER joins is not implemented */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -669,7 +670,8 @@ LargeDataTransferLocation(List *joinOrder) JoinRuleType joinRuleType = joinOrderNode->joinRuleType; /* we consider the following join rules to cause large data transfers */ - if (joinRuleType == SINGLE_PARTITION_JOIN || + if (joinRuleType == SINGLE_HASH_PARTITION_JOIN || + joinRuleType == SINGLE_RANGE_PARTITION_JOIN || joinRuleType == DUAL_PARTITION_JOIN || joinRuleType == CARTESIAN_PRODUCT) { @@ -862,7 +864,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType) { RuleEvalFunctionArray[REFERENCE_JOIN] = &ReferenceJoin; RuleEvalFunctionArray[LOCAL_PARTITION_JOIN] = &LocalJoin; - RuleEvalFunctionArray[SINGLE_PARTITION_JOIN] = &SinglePartitionJoin; + RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin; + RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin; RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct; @@ -888,7 +891,10 @@ JoinRuleName(JoinRuleType ruleType) /* use strdup() to be independent of memory contexts */ RuleNameArray[REFERENCE_JOIN] = strdup("reference join"); RuleNameArray[LOCAL_PARTITION_JOIN] = strdup("local partition join"); - RuleNameArray[SINGLE_PARTITION_JOIN] = strdup("single partition join"); + RuleNameArray[SINGLE_HASH_PARTITION_JOIN] = + strdup("single hash partition join"); + RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] = + strdup("single range partition join"); RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join"); RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product"); @@ -1043,6 +1049,9 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, Var *currentPartitionColumn = currentJoinNode->partitionColumn; char currentPartitionMethod = currentJoinNode->partitionMethod; TableEntry *currentAnchorTable = currentJoinNode->anchorTable; + JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType; + + OpExpr *joinClause = NULL; Oid relationId = candidateTable->relationId; uint32 tableId = candidateTable->rangeTableId; @@ -1056,22 +1065,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } /* - * If we previously dual-hash re-partitioned the tables for a join, we - * currently don't allow a single-repartition join. + * If we previously dual-hash re-partitioned the tables for a join or made + * cartesian product, we currently don't allow a single-repartition join. */ - if (currentPartitionMethod == REDISTRIBUTE_BY_HASH) + if (currentJoinRuleType == DUAL_PARTITION_JOIN || + currentJoinRuleType == CARTESIAN_PRODUCT) { return NULL; } - if (currentPartitionMethod != DISTRIBUTE_BY_HASH) + joinClause = + SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses); + if (joinClause != NULL) { - OpExpr *joinClause = SinglePartitionJoinClause(currentPartitionColumn, - applicableJoinClauses); - - if (joinClause != NULL) + if (currentPartitionMethod == DISTRIBUTE_BY_HASH) { - nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN, + /* + * Single hash repartitioning may perform worse than dual hash + * repartitioning. Thus, we control it via a guc. + */ + if (!EnableSingleHashRepartitioning) + { + return NULL; + } + + nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN, + currentPartitionColumn, + currentPartitionMethod, + currentAnchorTable); + } + else + { + nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, currentPartitionColumn, currentPartitionMethod, currentAnchorTable); @@ -1079,18 +1104,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } /* evaluate re-partitioning the current table only if the rule didn't apply above */ - if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH && - candidatePartitionMethod != DISTRIBUTE_BY_NONE) + if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_NONE) { OpExpr *joinClause = SinglePartitionJoinClause(candidatePartitionColumn, applicableJoinClauses); if (joinClause != NULL) { - nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN, - candidatePartitionColumn, - candidatePartitionMethod, - candidateTable); + if (candidatePartitionMethod == DISTRIBUTE_BY_HASH) + { + /* + * Single hash repartitioning may perform worse than dual hash + * repartitioning. Thus, we control it via a guc. + */ + if (!EnableSingleHashRepartitioning) + { + return NULL; + } + + nextJoinNode = MakeJoinOrderNode(candidateTable, + SINGLE_HASH_PARTITION_JOIN, + candidatePartitionColumn, + candidatePartitionMethod, + candidateTable); + } + else + { + nextJoinNode = MakeJoinOrderNode(candidateTable, + SINGLE_RANGE_PARTITION_JOIN, + candidatePartitionColumn, + candidatePartitionMethod, + candidateTable); + } } } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 256f42268..b6dd0dcce 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -94,7 +94,15 @@ static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); -static MultiNode * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, +static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode, + MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses); +static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode, + MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses); +static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, @@ -2208,7 +2216,10 @@ JoinRuleApplyFunction(JoinRuleType ruleType) { RuleApplyFunctionArray[REFERENCE_JOIN] = &ApplyReferenceJoin; RuleApplyFunctionArray[LOCAL_PARTITION_JOIN] = &ApplyLocalJoin; - RuleApplyFunctionArray[SINGLE_PARTITION_JOIN] = &ApplySinglePartitionJoin; + RuleApplyFunctionArray[SINGLE_HASH_PARTITION_JOIN] = + &ApplySingleHashPartitionJoin; + RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = + &ApplySingleRangePartitionJoin; RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin; RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct; @@ -2264,12 +2275,50 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, } +/* + * ApplySingleRangePartitionJoin is a wrapper around ApplySinglePartitionJoin() + * which sets the joinRuleType properly. + */ +static MultiNode * +ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses) +{ + MultiJoin *joinNode = + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + applicableJoinClauses); + + joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN; + + return (MultiNode *) joinNode; +} + + +/* + * ApplySingleHashPartitionJoin is a wrapper around ApplySinglePartitionJoin() + * which sets the joinRuleType properly. + */ +static MultiNode * +ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses) +{ + MultiJoin *joinNode = + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + applicableJoinClauses); + + joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN; + + return (MultiNode *) joinNode; +} + + /* * ApplySinglePartitionJoin creates a new MultiJoin node that joins the left and * right node. The function also adds a MultiPartition node on top of the node * (left or right) that is not partitioned on the join column. */ -static MultiNode * +static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *applicableJoinClauses) @@ -2337,11 +2386,10 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, } /* finally set join operator fields */ - joinNode->joinRuleType = SINGLE_PARTITION_JOIN; joinNode->joinType = joinType; joinNode->joinClauseList = applicableJoinClauses; - return (MultiNode *) joinNode; + return joinNode; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a0e7803ad..7093277e3 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -14,6 +14,7 @@ #include "postgres.h" #include +#include #include "miscadmin.h" @@ -320,7 +321,8 @@ BuildJobTree(MultiTreeRoot *multiTree) if (currentNodeType == T_MultiJoin) { MultiJoin *joinNode = (MultiJoin *) currentNode; - if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN || + if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN || + joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN || joinNode->joinRuleType == DUAL_PARTITION_JOIN) { boundaryNodeJobType = JOIN_MAP_MERGE_JOB; @@ -350,14 +352,19 @@ BuildJobTree(MultiTreeRoot *multiTree) PartitionType partitionType = PARTITION_INVALID_FIRST; Oid baseRelationId = InvalidOid; - if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN) + if (joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN) { partitionType = RANGE_PARTITION_TYPE; baseRelationId = RangePartitionJoinBaseRelationId(joinNode); } + else if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN) + { + partitionType = SINGLE_HASH_PARTITION_TYPE; + baseRelationId = RangePartitionJoinBaseRelationId(joinNode); + } else if (joinNode->joinRuleType == DUAL_PARTITION_JOIN) { - partitionType = HASH_PARTITION_TYPE; + partitionType = DUAL_HASH_PARTITION_TYPE; } if (CitusIsA(leftChildNode, MultiPartition)) @@ -411,7 +418,8 @@ BuildJobTree(MultiTreeRoot *multiTree) Query *jobQuery = BuildJobQuery(queryNode, dependedJobList); MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList, - partitionKey, HASH_PARTITION_TYPE, + partitionKey, + DUAL_HASH_PARTITION_TYPE, InvalidOid, SUBQUERY_MAP_MERGE_JOB); @@ -1849,21 +1857,20 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, * If join type is not set, this means this job represents a subquery, and * uses hash partitioning. */ - if (partitionType == HASH_PARTITION_TYPE) + if (partitionType == DUAL_HASH_PARTITION_TYPE) { uint32 partitionCount = HashPartitionCount(); - mapMergeJob->partitionType = HASH_PARTITION_TYPE; + mapMergeJob->partitionType = DUAL_HASH_PARTITION_TYPE; mapMergeJob->partitionCount = partitionCount; } - else if (partitionType == RANGE_PARTITION_TYPE) + else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType == + RANGE_PARTITION_TYPE) { - /* build the split point object for the table on the right-hand side */ DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId); bool hasUninitializedShardInterval = false; uint32 shardCount = cache->shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray; - char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = 0; hasUninitializedShardInterval = cache->hasUninitializedShardInterval; if (hasUninitializedShardInterval) @@ -1872,12 +1879,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, "missing min/max values"))); } - basePartitionMethod = PartitionMethod(baseRelationId); - - /* this join-type currently doesn't work for hash partitioned tables */ - Assert(basePartitionMethod != DISTRIBUTE_BY_HASH); - - mapMergeJob->partitionType = RANGE_PARTITION_TYPE; + mapMergeJob->partitionType = partitionType; mapMergeJob->partitionCount = shardCount; mapMergeJob->sortedShardIntervalArray = sortedShardIntervalArray; mapMergeJob->sortedShardIntervalArrayLength = shardCount; @@ -2729,7 +2731,7 @@ DependsOnHashPartitionJob(Job *job) if (CitusIsA(dependedJob, MapMergeJob)) { MapMergeJob *mapMergeJob = (MapMergeJob *) dependedJob; - if (mapMergeJob->partitionType == HASH_PARTITION_TYPE) + if (mapMergeJob->partitionType == DUAL_HASH_PARTITION_TYPE) { dependsOnHashPartitionJob = true; } @@ -3758,6 +3760,7 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment Task *leftMergeTask = (Task *) leftFragment->fragmentReference; Task *rightMergeTask = (Task *) rightFragment->fragmentReference; + if (leftMergeTask->partitionId != rightMergeTask->partitionId) { ereport(DEBUG2, (errmsg("join prunable for task partitionId %u and %u", @@ -3771,8 +3774,9 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment } } + /* - * We have a range (re)partition join. We now get shard intervals for both + * We have a single (re)partition join. We now get shard intervals for both * fragments, and then check if these intervals overlap. */ leftFragmentInterval = FragmentInterval(leftFragment); @@ -4261,13 +4265,32 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) filterQueryEscapedText, partitionColumnName, partitionColumnTypeFullName, splitPointString->data); } + else if (partitionType == SINGLE_HASH_PARTITION_TYPE) + { + ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; + uint32 intervalCount = mapMergeJob->partitionCount; + + ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); + StringInfo splitPointString = SplitPointArrayString(splitPointObject, + partitionColumnType, + partitionColumnTypeMod); + appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, + filterQueryEscapedText, partitionColumnName, + partitionColumnTypeFullName, splitPointString->data); + } else { uint32 partitionCount = mapMergeJob->partitionCount; + ShardInterval **intervalArray = + GenerateSyntheticShardIntervalArray(partitionCount); + ArrayType *splitPointObject = SplitPointObject(intervalArray, + mapMergeJob->partitionCount); + StringInfo splitPointString = + SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID)); appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, filterQueryEscapedText, partitionColumnName, - partitionColumnTypeFullName, partitionCount); + partitionColumnTypeFullName, splitPointString->data); } /* convert filter query task into map task */ @@ -4282,6 +4305,46 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) } +/* + * GenerateSyntheticShardIntervalArray returns a shard interval pointer array + * which has a uniform hash distribution for the given input partitionCount. + * + * The function only fills the min/max values of shard the intervals. Thus, should + * not be used for general purpose operations. + */ +ShardInterval ** +GenerateSyntheticShardIntervalArray(int partitionCount) +{ + ShardInterval **shardIntervalArray = palloc0(partitionCount * + sizeof(ShardInterval *)); + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount; + int shardIndex = 0; + + for (shardIndex = 0; shardIndex < partitionCount; ++shardIndex) + { + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); + + /* calculate the split of the hash space */ + int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); + int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); + + shardInterval->relationId = InvalidOid; + shardInterval->minValueExists = true; + shardInterval->minValue = Int32GetDatum(shardMinHashToken); + + shardInterval->maxValueExists = true; + shardInterval->maxValue = Int32GetDatum(shardMaxHashToken); + + shardInterval->shardId = INVALID_SHARD_ID; + shardInterval->valueTypeId = INT4OID; + + shardIntervalArray[shardIndex] = shardInterval; + } + + return shardIntervalArray; +} + + /* * ColumnName resolves the given column's name. The given column could belong to * a regular table or to an intermediate table formed to execute a distributed @@ -4396,6 +4459,10 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) initialPartitionId = 1; partitionCount = partitionCount + 1; } + else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE) + { + initialPartitionId = 0; + } /* build merge tasks and their associated "map output fetch" tasks */ for (partitionId = initialPartitionId; partitionId < partitionCount; partitionId++) @@ -4463,7 +4530,7 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) /* merge task depends on completion of fetch tasks */ mergeTask->dependedTaskList = mapOutputFetchTaskList; - /* if range repartitioned, each merge task represents an interval */ + /* if single repartitioned, each merge task represents an interval */ if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE) { int32 mergeTaskIntervalId = partitionId - 1; @@ -4472,6 +4539,14 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId]; } + else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE) + { + int32 mergeTaskIntervalId = partitionId; + ShardInterval **mergeTaskIntervals = mapMergeJob->sortedShardIntervalArray; + Assert(mergeTaskIntervalId >= 0); + + mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId]; + } mergeTaskList = lappend(mergeTaskList, mergeTask); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c8fa3c82a..29eb737be 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -373,6 +373,17 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_single_hash_repartition_joins", + gettext_noop("Enables single hash repartitioning between hash " + "distributed tables"), + NULL, + &EnableSingleHashRepartitioning, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.subquery_pushdown", gettext_noop("Enables supported subquery pushdown to workers."), diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 6609fb06c..150255f58 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -168,8 +168,6 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra int shardCount, FmgrInfo * shardIntervalSortCompareFunction); -static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, - int shardIntervalArrayLength); static void PrepareWorkerNodeCache(void); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); @@ -1181,7 +1179,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, * has a uniform hash distribution, as produced by master_create_worker_shards for * hash partitioned tables. */ -static bool +bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength) { diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 6af2b52d7..d8ab229fa 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -25,11 +25,6 @@ #include "utils/memutils.h" -static int SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, FmgrInfo *compareFunction); - - /* * LowestShardIntervalById returns the shard interval with the lowest shard * ID from a list of shard intervals. @@ -332,8 +327,17 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry) * matching a given partition column value and returns it's index in the cached * array. If it can not find any shard interval with the given value, it returns * INVALID_SHARD_INDEX. + * + * TODO: Data re-partitioning logic (e.g., worker_hash_partition_table()) + * on the worker nodes relies on this function in order to be consistent + * with shard pruning. Since the worker nodes don't have the metadata, a + * synthetically generated ShardInterval ** is passed to the to this + * function. The synthetic shard intervals contain only shardmin and shardmax + * values. A proper implementation of this approach should be introducing an + * intermediate data structure (e.g., ShardRange) on which this function + * operates instead of operating shard intervals. */ -static int +int SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction) { diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index a4be31b3b..3a0ba7542 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -23,15 +23,18 @@ #include #include #include +#include #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "catalog/pg_am.h" #include "catalog/pg_collation.h" +#include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" #include "distributed/multi_copy.h" +#include "distributed/multi_physical_planner.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" @@ -53,6 +56,9 @@ static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */ /* Local functions forward declarations */ +static ShardInterval ** SyntheticShardIntervalArrayForShardMinValues( + Datum *shardMinValues, + int shardCount); static StringInfo InitTaskAttemptDirectory(uint64 jobId, uint32 taskId); static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount); static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount); @@ -73,6 +79,7 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionId(Datum partitionValue, const void *context); +static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context); static bool FileIsLink(char *filename, struct stat filestat); @@ -178,28 +185,88 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) text *filterQueryText = PG_GETARG_TEXT_P(2); text *partitionColumnText = PG_GETARG_TEXT_P(3); Oid partitionColumnType = PG_GETARG_OID(4); - uint32 partitionCount = PG_GETARG_UINT32(5); + ArrayType *hashRangeObject = NULL; const char *filterQuery = text_to_cstring(filterQueryText); const char *partitionColumn = text_to_cstring(partitionColumnText); HashPartitionContext *partitionContext = NULL; FmgrInfo *hashFunction = NULL; + Datum *hashRangeArray = NULL; + int32 partitionCount = 0; StringInfo taskDirectory = NULL; StringInfo taskAttemptDirectory = NULL; FileOutputStream *partitionFileArray = NULL; - uint32 fileCount = partitionCount; + uint32 fileCount = 0; + + uint32 (*HashPartitionIdFunction)(Datum, const void *); + + Oid partitionBucketOid = InvalidOid; CheckCitusVersion(ERROR); + partitionContext = palloc0(sizeof(HashPartitionContext)); + + /* + * We do this hack for backward compatibility. + * + * In the older versions of Citus, worker_hash_partition_table()'s 6th parameter + * was an integer which denoted the number of buckets to split the shard's data. + * In the later versions of Citus, the sixth parameter is changed to get an array + * of shard ranges, which is used as the ranges to split the shard's data. + * + * Keeping this value is important if the coordinator's Citus version is <= 7.3 + * and worker Citus version is > 7.3. + */ + partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5); + if (partitionBucketOid == INT4ARRAYOID) + { + hashRangeObject = PG_GETARG_ARRAYTYPE_P(5); + + hashRangeArray = DeconstructArrayObject(hashRangeObject); + partitionCount = ArrayObjectCount(hashRangeObject); + + partitionContext->syntheticShardIntervalArray = + SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); + partitionContext->hasUniformHashDistribution = + HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray, + partitionCount); + + HashPartitionIdFunction = &HashPartitionId; + } + else if (partitionBucketOid == INT4OID) + { + partitionCount = PG_GETARG_UINT32(5); + + partitionContext->syntheticShardIntervalArray = + GenerateSyntheticShardIntervalArray(partitionCount); + partitionContext->hasUniformHashDistribution = true; + + HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI; + } + else + { + /* we should never get other type of parameters */ + ereport(ERROR, (errmsg("unexpected parameter for " + "worker_hash_partition_table()"))); + } + /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC); - /* create hash partition context object */ - partitionContext = palloc0(sizeof(HashPartitionContext)); + /* we create as many files as the number of split points */ + fileCount = partitionCount; + partitionContext->hashFunction = hashFunction; partitionContext->partitionCount = partitionCount; + /* we'll use binary search, we need the comparison function */ + if (!partitionContext->hasUniformHashDistribution) + { + partitionContext->comparisonFunction = + GetFunctionInfo(partitionColumnType, BTREE_AM_OID, BTORDER_PROC); + } + /* init directories and files to write the partitioned data to */ taskDirectory = InitTaskDirectory(jobId, taskId); taskAttemptDirectory = InitTaskAttemptDirectory(jobId, taskId); @@ -209,7 +276,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) /* call the partitioning function that does the actual work */ FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType, - &HashPartitionId, (const void *) partitionContext, + HashPartitionIdFunction, (const void *) partitionContext, partitionFileArray, fileCount); /* close partition files and atomically rename (commit) them */ @@ -221,6 +288,39 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) } +/* + * SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array + * which gets the shardMinValues from the input shardMinValues array. Note that + * we simply calculate shard max values by decrementing previous shard min values by one. + * + * The function only fills the min/max values of shard the intervals. Thus, should + * not be used for general purpose operations. + */ +static ShardInterval ** +SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount) +{ + int shardIndex = 0; + Datum nextShardMaxValue = Int32GetDatum(INT32_MAX); + ShardInterval **syntheticShardIntervalArray = + palloc(sizeof(ShardInterval *) * shardCount); + + for (shardIndex = shardCount - 1; shardIndex >= 0; --shardIndex) + { + Datum currentShardMinValue = shardMinValues[shardIndex]; + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); + + shardInterval->minValue = currentShardMinValue; + shardInterval->maxValue = nextShardMaxValue; + + nextShardMaxValue = Int32GetDatum(DatumGetInt32(currentShardMinValue) - 1); + + syntheticShardIntervalArray[shardIndex] = shardInterval; + } + + return syntheticShardIntervalArray; +} + + /* * GetFunctionInfo first resolves the operator for the given data type, access * method, and support procedure. The function then uses the resolved operator's @@ -1156,6 +1256,52 @@ RangePartitionId(Datum partitionValue, const void *context) /* * HashPartitionId determines the partition number for the given data value * using hash partitioning. More specifically, the function returns zero if the + * given data value is null. If not, the function follows the exact same approach + * as Citus distributed planner uses. + */ +static uint32 +HashPartitionId(Datum partitionValue, const void *context) +{ + HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context; + FmgrInfo *hashFunction = hashPartitionContext->hashFunction; + uint32 partitionCount = hashPartitionContext->partitionCount; + ShardInterval **syntheticShardIntervalArray = + hashPartitionContext->syntheticShardIntervalArray; + FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction; + Datum hashDatum = FunctionCall1(hashFunction, partitionValue); + int32 hashResult = 0; + uint32 hashPartitionId = 0; + + if (hashDatum == 0) + { + return hashPartitionId; + } + + if (hashPartitionContext->hasUniformHashDistribution) + { + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount; + + hashResult = DatumGetInt32(hashDatum); + hashPartitionId = (uint32) (hashResult - INT32_MIN) / hashTokenIncrement; + } + else + { + hashPartitionId = + SearchCachedShardInterval(hashDatum, syntheticShardIntervalArray, + partitionCount, comparisonFunction); + } + + + return hashPartitionId; +} + + +/* + * HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility + * between the Citus versions 7.4 and older versions. + * + * HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value + * using hash partitioning. More specifically, the function returns zero if the * given data value is null. If not, the function applies the standard Postgres * hashing function for the given data type, and mods the hashed result with the * number of partitions. The function then returns the modded number as the @@ -1166,7 +1312,7 @@ RangePartitionId(Datum partitionValue, const void *context) * see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4." */ static uint32 -HashPartitionId(Datum partitionValue, const void *context) +HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context) { HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context; FmgrInfo *hashFunction = hashPartitionContext->hashFunction; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 9bf1e09ef..0b34a51fc 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -90,6 +90,8 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void FlushDistTableCache(void); extern void InvalidateMetadataSystemCache(void); extern Datum DistNodeMetadata(void); +extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength); extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index cecaa20be..ba871b1da 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -30,9 +30,10 @@ typedef enum JoinRuleType JOIN_RULE_INVALID_FIRST = 0, REFERENCE_JOIN = 1, LOCAL_PARTITION_JOIN = 2, - SINGLE_PARTITION_JOIN = 3, - DUAL_PARTITION_JOIN = 4, - CARTESIAN_PRODUCT = 5, + SINGLE_HASH_PARTITION_JOIN = 3, + SINGLE_RANGE_PARTITION_JOIN = 4, + DUAL_PARTITION_JOIN = 5, + CARTESIAN_PRODUCT = 6, /* * Add new join rule types above this comment. After adding, you must also @@ -75,6 +76,7 @@ typedef struct JoinOrderNode /* Config variables managed via guc.c */ extern bool LogMultiJoinOrder; +extern bool EnableSingleHashRepartitioning; /* Function declaration for determining table join orders */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ce2dbc3d7..a9b805a9b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -38,7 +38,7 @@ #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" #define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \ - (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %d)" + (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ (" UINT64_FORMAT ", %d, '%s', '%s')" #define MERGE_FILES_AND_RUN_QUERY_COMMAND \ @@ -69,7 +69,8 @@ typedef enum { PARTITION_INVALID_FIRST = 0, RANGE_PARTITION_TYPE = 1, - HASH_PARTITION_TYPE = 2 + SINGLE_HASH_PARTITION_TYPE = 2, + DUAL_HASH_PARTITION_TYPE = 3 } PartitionType; @@ -323,6 +324,8 @@ extern int CompareShardPlacements(const void *leftElement, const void *rightElem extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); +extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); + /* function declarations for Task and Task list operations */ extern bool TasksEqual(const Task *a, const Task *b); diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 5c9d6cde8..825358e4f 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -36,6 +36,10 @@ extern int ShardIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry); extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry); +extern int SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction); extern bool SingleReplicatedTable(Oid relationId); + #endif /* SHARDINTERVAL_UTILS_H_ */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index d78c47c30..4321fab9c 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -15,6 +15,7 @@ #define WORKER_PROTOCOL_H #include "fmgr.h" +#include "distributed/shardinterval_utils.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "storage/fd.h" @@ -73,7 +74,10 @@ typedef struct RangePartitionContext typedef struct HashPartitionContext { FmgrInfo *hashFunction; + FmgrInfo *comparisonFunction; + ShardInterval **syntheticShardIntervalArray; uint32 partitionCount; + bool hasUniformHashDistribution; } HashPartitionContext; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 35d51da29..ade89fc55 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2'; ALTER EXTENSION citus UPDATE TO '7.2-3'; ALTER EXTENSION citus UPDATE TO '7.3-3'; ALTER EXTENSION citus UPDATE TO '7.4-1'; +ALTER EXTENSION citus UPDATE TO '7.4-2'; +ALTER EXTENSION citus UPDATE TO '7.4-3'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 594cf8513..1b2a10a2e 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -103,7 +103,7 @@ ERROR: cannot perform distributed planning on this query DETAIL: Cartesian products are currently unsupported EXPLAIN SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; -LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] +LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) @@ -161,7 +161,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] -- range partitioned one. EXPLAIN SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; -LOG: join order: [ "orders_hash" ][ single partition join "customer_append" ] +LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) diff --git a/src/test/regress/expected/multi_join_order_tpch_repartition.out b/src/test/regress/expected/multi_join_order_tpch_repartition.out index f11e6f470..9ca9aab86 100644 --- a/src/test/regress/expected/multi_join_order_tpch_repartition.out +++ b/src/test/regress/expected/multi_join_order_tpch_repartition.out @@ -51,7 +51,7 @@ GROUP BY ORDER BY revenue DESC, o_orderdate; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ] QUERY PLAN ------------------------------------------------------------------------------------------------ Sort (cost=0.00..0.00 rows=0 width=0) @@ -94,7 +94,7 @@ GROUP BY c_comment ORDER BY revenue DESC; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ][ reference join "nation" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ] QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) @@ -135,7 +135,7 @@ WHERE AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK') AND l_shipinstruct = 'DELIVER IN PERSON' ); -LOG: join order: [ "lineitem" ][ single partition join "part_append" ] +LOG: join order: [ "lineitem" ][ single range partition join "part_append" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) @@ -154,7 +154,7 @@ WHERE c_custkey = o_custkey GROUP BY l_partkey; -LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part_append" ][ single partition join "customer_append" ] +LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ] QUERY PLAN -------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index bca1f09c1..2ac6780dc 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -967,14 +967,15 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM articles_hash a, articles_hash b - WHERE a.id = b.id AND a.author_id = 1; + WHERE a.id = b.id AND a.author_id = 1 +ORDER BY 1 DESC; id | author_id | title | word_count | id | author_id | title | word_count ----+-----------+--------------+------------+----+-----------+--------------+------------ 41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814 - 11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347 31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271 - 1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572 21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890 + 11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347 + 1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572 (5 rows) SET citus.enable_repartition_joins TO OFF; @@ -1779,15 +1780,15 @@ DETAIL: Creating dependency on merge taskId 2 DEBUG: pruning merge fetch taskId 3 DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 4 -DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 9 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 11 -DETAIL: Creating dependency on merge taskId 6 -DEBUG: pruning merge fetch taskId 13 DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 13 +DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 15 DETAIL: Creating dependency on merge taskId 8 ERROR: the query contains a join that requires repartitioning diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out index e22d73463..1be9e76b1 100644 --- a/src/test/regress/expected/non_colocated_join_order.out +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -41,7 +41,7 @@ SET citus.shard_replication_factor to 1; -- for interval [8,10] repartition join logic will be triggered. SET citus.enable_repartition_joins to ON; SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ] +LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ] DEBUG: cannot use real time executor with repartition jobs HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. count diff --git a/src/test/regress/expected/single_hash_repartition_join.out b/src/test/regress/expected/single_hash_repartition_join.out new file mode 100644 index 000000000..62f99afce --- /dev/null +++ b/src/test/regress/expected/single_hash_repartition_join.out @@ -0,0 +1,465 @@ +-- +-- MULTI_SINGLE_HASH_REPARTITION_JOIN +-- +CREATE SCHEMA single_hash_repartition; +SET search_path TO 'single_hash_repartition'; +SET citus.enable_single_hash_repartition_joins TO ON; +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); +SET citus.shard_count TO 4; +SELECT create_distributed_table('single_hash_repartition_first', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('single_hash_repartition_second', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +SET citus.log_multi_join_order TO ON; +SET client_min_messages TO DEBUG2; +-- a very basic single hash re-partitioning example +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- the same query with the orders of the tables have changed +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + t2.sum = t1.id; +LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; +LOG: join order: [ "single_hash_repartition_second" ][ reference join "ref_table" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- a more complicated join order, first hash-repartition join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.sum = t2.sum AND t1.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ dual partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 24 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- single hash repartitioning is not supported between different column types +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.avg = t3.id; +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ cartesian product "single_hash_repartition_second" ] +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported +-- single repartition query in CTE +-- should work fine +EXPLAIN WITH cte1 AS +( + SELECT + t1.id * t2.avg as data + FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 + WHERE + t1.id = t2.sum + AND t1.sum > 5000 + ORDER BY 1 DESC + LIMIT 50 +) +SELECT + count(*) +FROM + cte1, single_hash_repartition_first +WHERE + cte1.data > single_hash_repartition_first.id; +DEBUG: generating subplan 7_1 for CTE cte1: SELECT ((t1.id)::double precision * t2.avg) AS data FROM single_hash_repartition.single_hash_repartition_first t1, single_hash_repartition.single_hash_repartition_second t2 WHERE ((t1.id = t2.sum) AND (t1.sum > 5000)) ORDER BY ((t1.id)::double precision * t2.avg) DESC LIMIT 50 +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: push down of limit count: 50 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- two single repartitions +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 24 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- two single repartitions again, but this +-- time the columns of the second join is reverted +EXPLAIN SELECT + avg(t1.avg + t2.avg) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.id = t3.sum +LIMIT 10; +LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: push down of limit count: 10 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 24 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- lets try one more thing, show that non uniform shard distribution doesn't +-- end up with single hash repartitioning + UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass); +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [-1073741824,-2] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [0,1073741822] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [1073741824,2147483646] +DEBUG: join prunable for intervals [-1073741824,-2] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [-1073741824,-2] and [0,1073741822] +DEBUG: join prunable for intervals [-1073741824,-2] and [1073741824,2147483646] +DEBUG: join prunable for intervals [0,1073741822] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [0,1073741822] and [-1073741824,-2] +DEBUG: join prunable for intervals [0,1073741822] and [1073741824,2147483646] +DEBUG: join prunable for intervals [1073741824,2147483646] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [1073741824,2147483646] and [-1073741824,-2] +DEBUG: join prunable for intervals [1073741824,2147483646] and [0,1073741822] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.sum = t2.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +RESET client_min_messages; +RESET search_path; +DROP SCHEMA single_hash_repartition CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table single_hash_repartition.single_hash_repartition_first +drop cascades to table single_hash_repartition.single_hash_repartition_second +drop cascades to table single_hash_repartition.ref_table +SET citus.enable_single_hash_repartition_joins TO OFF; diff --git a/src/test/regress/expected/worker_check_invalid_arguments.out b/src/test/regress/expected/worker_check_invalid_arguments.out index fbc91b468..058539fce 100644 --- a/src/test/regress/expected/worker_check_invalid_arguments.out +++ b/src/test/regress/expected/worker_check_invalid_arguments.out @@ -46,7 +46,7 @@ ERROR: partition column type 20 and split point type 25 do not match -- Check that we fail with bad partition column type on hash partitioning SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Name, :Bad_Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); ERROR: partition column types 25 and 20 do not match -- Now, partition table data using valid arguments SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, diff --git a/src/test/regress/expected/worker_hash_partition.out b/src/test/regress/expected/worker_hash_partition.out index 61725f4f7..63ed5f27d 100644 --- a/src/test/regress/expected/worker_hash_partition.out +++ b/src/test/regress/expected/worker_hash_partition.out @@ -7,15 +7,11 @@ \set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Type '\'int8\'' \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_All 'SELECT *' --- Hash functions internally return unsigned 32-bit integers. However, when --- called externally, the return value becomes a signed 32-bit integer. We hack --- around this conversion issue by bitwise-anding the hash results. Note that --- this only works because we are modding with 4. The proper Hash_Mod_Function --- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4) --- else ((hashint8(l_orderkey) + 4294967296) % 4) end). -\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )' +-- Hash functions is mapped to exactly behave as Citus planner does +\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_part_00 \set Table_Part_01 lineitem_hash_part_01 \set Table_Part_02 lineitem_hash_part_02 @@ -23,7 +19,7 @@ -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -36,13 +32,25 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; count ------- - 3081 + 2885 +(1 row) + +SELECT COUNT(*) FROM :Table_Part_01; + count +------- + 3009 +(1 row) + +SELECT COUNT(*) FROM :Table_Part_02; + count +------- + 3104 (1 row) SELECT COUNT(*) FROM :Table_Part_03; count ------- - 2935 + 3002 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/expected/worker_hash_partition_complex.out b/src/test/regress/expected/worker_hash_partition_complex.out index 2dfbe3702..a90f4d505 100644 --- a/src/test/regress/expected/worker_hash_partition_complex.out +++ b/src/test/regress/expected/worker_hash_partition_complex.out @@ -7,9 +7,10 @@ \set Partition_Column_Text '\'l_partkey\'' \set Partition_Column_Type 23 \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment' \set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08' -\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_complex_part_00 \set Table_Part_01 lineitem_hash_complex_part_01 \set Table_Part_02 lineitem_hash_complex_part_02 @@ -22,7 +23,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, ' WHERE l_shipdate >= date ''1992-01-15''' ' AND l_discount between 0.02 AND 0.08', :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -36,13 +37,13 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; count ------- - 1988 + 1883 (1 row) SELECT COUNT(*) FROM :Table_Part_03; count ------- - 1881 + 1913 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/expected/worker_null_data_partition.out b/src/test/regress/expected/worker_null_data_partition.out index 11881fbc9..344658142 100644 --- a/src/test/regress/expected/worker_null_data_partition.out +++ b/src/test/regress/expected/worker_null_data_partition.out @@ -98,14 +98,15 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( -- into the 0th repartition bucket. \set Hash_TaskId 101107 \set Partition_Count 4 -\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' +\set hashTokenIncrement 1073741824 \set Hash_Table_Part_00 supplier_hash_part_00 \set Hash_Table_Part_01 supplier_hash_part_01 \set Hash_Table_Part_02 supplier_hash_part_02 -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -117,13 +118,13 @@ COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_000 SELECT COUNT(*) FROM :Hash_Table_Part_00; count ------- - 298 + 282 (1 row) SELECT COUNT(*) FROM :Hash_Table_Part_02; count ------- - 203 + 102 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4e7b8583b..8156c9e2d 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -63,7 +63,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg test: multi_agg_type_conversion multi_count_type_conversion -test: multi_partition_pruning +test: multi_partition_pruning single_hash_repartition_join test: multi_join_pruning multi_hash_pruning test: multi_null_minmax_value_pruning test: multi_query_directory_cleanup diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 45f6ff983..dc2d7dcb1 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2'; ALTER EXTENSION citus UPDATE TO '7.2-3'; ALTER EXTENSION citus UPDATE TO '7.3-3'; ALTER EXTENSION citus UPDATE TO '7.4-1'; +ALTER EXTENSION citus UPDATE TO '7.4-2'; +ALTER EXTENSION citus UPDATE TO '7.4-3'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 1d9216577..3c5f4b69a 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -461,7 +461,8 @@ SELECT * SET citus.enable_repartition_joins TO ON; SELECT * FROM articles_hash a, articles_hash b - WHERE a.id = b.id AND a.author_id = 1; + WHERE a.id = b.id AND a.author_id = 1 +ORDER BY 1 DESC; SET citus.enable_repartition_joins TO OFF; -- queries which hit more than 1 shards are not router plannable or executable -- handled by real-time executor diff --git a/src/test/regress/sql/single_hash_repartition_join.sql b/src/test/regress/sql/single_hash_repartition_join.sql new file mode 100644 index 000000000..8745ac07c --- /dev/null +++ b/src/test/regress/sql/single_hash_repartition_join.sql @@ -0,0 +1,143 @@ +-- +-- MULTI_SINGLE_HASH_REPARTITION_JOIN +-- + +CREATE SCHEMA single_hash_repartition; + +SET search_path TO 'single_hash_repartition'; +SET citus.enable_single_hash_repartition_joins TO ON; + +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); + +SET citus.shard_count TO 4; + +SELECT create_distributed_table('single_hash_repartition_first', 'id'); +SELECT create_distributed_table('single_hash_repartition_second', 'id'); +SELECT create_reference_table('ref_table'); + +SET citus.log_multi_join_order TO ON; +SET client_min_messages TO DEBUG2; + +-- a very basic single hash re-partitioning example +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; + +-- the same query with the orders of the tables have changed +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + t2.sum = t1.id; + +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; + +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; + + +-- a more complicated join order, first hash-repartition join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.sum = t2.sum AND t1.sum = t3.id; + +-- single hash repartitioning is not supported between different column types +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.avg = t3.id; + +-- single repartition query in CTE +-- should work fine +EXPLAIN WITH cte1 AS +( + SELECT + t1.id * t2.avg as data + FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 + WHERE + t1.id = t2.sum + AND t1.sum > 5000 + ORDER BY 1 DESC + LIMIT 50 +) +SELECT + count(*) +FROM + cte1, single_hash_repartition_first +WHERE + cte1.data > single_hash_repartition_first.id; + + +-- two single repartitions +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.sum = t3.id; + + +-- two single repartitions again, but this +-- time the columns of the second join is reverted +EXPLAIN SELECT + avg(t1.avg + t2.avg) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.id = t3.sum +LIMIT 10; + +-- lets try one more thing, show that non uniform shard distribution doesn't +-- end up with single hash repartitioning + + UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass); + +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; + +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.sum = t2.id; + +RESET client_min_messages; + +RESET search_path; +DROP SCHEMA single_hash_repartition CASCADE; +SET citus.enable_single_hash_repartition_joins TO OFF; + diff --git a/src/test/regress/sql/worker_check_invalid_arguments.sql b/src/test/regress/sql/worker_check_invalid_arguments.sql index abceee14c..c851d8b16 100644 --- a/src/test/regress/sql/worker_check_invalid_arguments.sql +++ b/src/test/regress/sql/worker_check_invalid_arguments.sql @@ -54,7 +54,7 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Name, :Bad_Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -- Now, partition table data using valid arguments diff --git a/src/test/regress/sql/worker_hash_partition.sql b/src/test/regress/sql/worker_hash_partition.sql index 024b46ed7..6137638fe 100644 --- a/src/test/regress/sql/worker_hash_partition.sql +++ b/src/test/regress/sql/worker_hash_partition.sql @@ -10,18 +10,12 @@ \set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Type '\'int8\'' \set Partition_Count 4 - +\set hashTokenIncrement 1073741824 \set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_All 'SELECT *' --- Hash functions internally return unsigned 32-bit integers. However, when --- called externally, the return value becomes a signed 32-bit integer. We hack --- around this conversion issue by bitwise-anding the hash results. Note that --- this only works because we are modding with 4. The proper Hash_Mod_Function --- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4) --- else ((hashint8(l_orderkey) + 4294967296) % 4) end). - -\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )' +-- Hash functions is mapped to exactly behave as Citus planner does +\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_part_00 \set Table_Part_01 lineitem_hash_part_01 @@ -32,7 +26,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000'; COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001'; @@ -40,6 +34,8 @@ COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002'; COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; +SELECT COUNT(*) FROM :Table_Part_01; +SELECT COUNT(*) FROM :Table_Part_02; SELECT COUNT(*) FROM :Table_Part_03; -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/sql/worker_hash_partition_complex.sql b/src/test/regress/sql/worker_hash_partition_complex.sql index 5b860ec71..722109332 100644 --- a/src/test/regress/sql/worker_hash_partition_complex.sql +++ b/src/test/regress/sql/worker_hash_partition_complex.sql @@ -10,11 +10,12 @@ \set Partition_Column_Text '\'l_partkey\'' \set Partition_Column_Type 23 \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment' \set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08' -\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_complex_part_00 \set Table_Part_01 lineitem_hash_complex_part_01 @@ -30,7 +31,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, ' WHERE l_shipdate >= date ''1992-01-15''' ' AND l_discount between 0.02 AND 0.08', :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -- Copy partitioned data files into tables for testing purposes diff --git a/src/test/regress/sql/worker_null_data_partition.sql b/src/test/regress/sql/worker_null_data_partition.sql index 557904187..b6fa47d3d 100644 --- a/src/test/regress/sql/worker_null_data_partition.sql +++ b/src/test/regress/sql/worker_null_data_partition.sql @@ -69,7 +69,8 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( \set Hash_TaskId 101107 \set Partition_Count 4 -\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' +\set hashTokenIncrement 1073741824 \set Hash_Table_Part_00 supplier_hash_part_00 \set Hash_Table_Part_01 supplier_hash_part_01 @@ -79,7 +80,7 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000'; COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';