diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 44f692404..710bead37 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -15,7 +15,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.1-1 7.1-2 7.1-3 7.1-4 \ 7.2-1 7.2-2 7.2-3 \ 7.3-1 7.3-2 7.3-3 \ - 7.4-1 7.4-2 + 7.4-1 7.4-2 7.4-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -197,6 +197,8 @@ $(EXTENSION)--7.4-1.sql: $(EXTENSION)--7.3-3.sql $(EXTENSION)--7.3-3--7.4-1.sql cat $^ > $@ $(EXTENSION)--7.4-2.sql: $(EXTENSION)--7.4-1.sql $(EXTENSION)--7.4-1--7.4-2.sql cat $^ > $@ +$(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.4-2--7.4-3.sql b/src/backend/distributed/citus--7.4-2--7.4-3.sql new file mode 100644 index 000000000..733e5219d --- /dev/null +++ b/src/backend/distributed/citus--7.4-2--7.4-3.sql @@ -0,0 +1,13 @@ +/* citus--7.4-2--7.4-3 */ +SET search_path = 'pg_catalog'; + +-- note that we're not dropping the older version of the function +CREATE FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, anyarray) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_hash_partition_table$$; +COMMENT ON FUNCTION worker_hash_partition_table(bigint, integer, text, text, oid, + anyarray) + IS 'hash partition query results'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 5926e7513..71a541d86 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.4-2' +default_version = '7.4-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 36f73b0b6..ee1504a86 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -35,6 +35,7 @@ /* Config variables managed via guc.c */ bool LogMultiJoinOrder = false; /* print join order as a debugging aid */ +bool EnableSingleHashRepartitioning = false; /* Function pointer type definition for join rule evaluation functions */ typedef JoinOrderNode *(*RuleEvalFunction) (JoinOrderNode *currentJoinNode, @@ -178,7 +179,7 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList) nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, nextTable, joinClauseList, joinType); - if (nextJoinNode->joinRuleType >= SINGLE_PARTITION_JOIN) + if (nextJoinNode->joinRuleType >= SINGLE_HASH_PARTITION_JOIN) { /* re-partitioning for OUTER joins is not implemented */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -669,7 +670,8 @@ LargeDataTransferLocation(List *joinOrder) JoinRuleType joinRuleType = joinOrderNode->joinRuleType; /* we consider the following join rules to cause large data transfers */ - if (joinRuleType == SINGLE_PARTITION_JOIN || + if (joinRuleType == SINGLE_HASH_PARTITION_JOIN || + joinRuleType == SINGLE_RANGE_PARTITION_JOIN || joinRuleType == DUAL_PARTITION_JOIN || joinRuleType == CARTESIAN_PRODUCT) { @@ -862,7 +864,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType) { RuleEvalFunctionArray[REFERENCE_JOIN] = &ReferenceJoin; RuleEvalFunctionArray[LOCAL_PARTITION_JOIN] = &LocalJoin; - RuleEvalFunctionArray[SINGLE_PARTITION_JOIN] = &SinglePartitionJoin; + RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin; + RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin; RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct; @@ -888,7 +891,10 @@ JoinRuleName(JoinRuleType ruleType) /* use strdup() to be independent of memory contexts */ RuleNameArray[REFERENCE_JOIN] = strdup("reference join"); RuleNameArray[LOCAL_PARTITION_JOIN] = strdup("local partition join"); - RuleNameArray[SINGLE_PARTITION_JOIN] = strdup("single partition join"); + RuleNameArray[SINGLE_HASH_PARTITION_JOIN] = + strdup("single hash partition join"); + RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] = + strdup("single range partition join"); RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join"); RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product"); @@ -1043,6 +1049,9 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, Var *currentPartitionColumn = currentJoinNode->partitionColumn; char currentPartitionMethod = currentJoinNode->partitionMethod; TableEntry *currentAnchorTable = currentJoinNode->anchorTable; + JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType; + + OpExpr *joinClause = NULL; Oid relationId = candidateTable->relationId; uint32 tableId = candidateTable->rangeTableId; @@ -1056,22 +1065,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } /* - * If we previously dual-hash re-partitioned the tables for a join, we - * currently don't allow a single-repartition join. + * If we previously dual-hash re-partitioned the tables for a join or made + * cartesian product, we currently don't allow a single-repartition join. */ - if (currentPartitionMethod == REDISTRIBUTE_BY_HASH) + if (currentJoinRuleType == DUAL_PARTITION_JOIN || + currentJoinRuleType == CARTESIAN_PRODUCT) { return NULL; } - if (currentPartitionMethod != DISTRIBUTE_BY_HASH) + joinClause = + SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses); + if (joinClause != NULL) { - OpExpr *joinClause = SinglePartitionJoinClause(currentPartitionColumn, - applicableJoinClauses); - - if (joinClause != NULL) + if (currentPartitionMethod == DISTRIBUTE_BY_HASH) { - nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN, + /* + * Single hash repartitioning may perform worse than dual hash + * repartitioning. Thus, we control it via a guc. + */ + if (!EnableSingleHashRepartitioning) + { + return NULL; + } + + nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN, + currentPartitionColumn, + currentPartitionMethod, + currentAnchorTable); + } + else + { + nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, currentPartitionColumn, currentPartitionMethod, currentAnchorTable); @@ -1079,18 +1104,38 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } /* evaluate re-partitioning the current table only if the rule didn't apply above */ - if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH && - candidatePartitionMethod != DISTRIBUTE_BY_NONE) + if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_NONE) { OpExpr *joinClause = SinglePartitionJoinClause(candidatePartitionColumn, applicableJoinClauses); if (joinClause != NULL) { - nextJoinNode = MakeJoinOrderNode(candidateTable, SINGLE_PARTITION_JOIN, - candidatePartitionColumn, - candidatePartitionMethod, - candidateTable); + if (candidatePartitionMethod == DISTRIBUTE_BY_HASH) + { + /* + * Single hash repartitioning may perform worse than dual hash + * repartitioning. Thus, we control it via a guc. + */ + if (!EnableSingleHashRepartitioning) + { + return NULL; + } + + nextJoinNode = MakeJoinOrderNode(candidateTable, + SINGLE_HASH_PARTITION_JOIN, + candidatePartitionColumn, + candidatePartitionMethod, + candidateTable); + } + else + { + nextJoinNode = MakeJoinOrderNode(candidateTable, + SINGLE_RANGE_PARTITION_JOIN, + candidatePartitionColumn, + candidatePartitionMethod, + candidateTable); + } } } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 256f42268..b6dd0dcce 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -94,7 +94,15 @@ static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); -static MultiNode * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, +static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode, + MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses); +static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode, + MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses); +static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, @@ -2208,7 +2216,10 @@ JoinRuleApplyFunction(JoinRuleType ruleType) { RuleApplyFunctionArray[REFERENCE_JOIN] = &ApplyReferenceJoin; RuleApplyFunctionArray[LOCAL_PARTITION_JOIN] = &ApplyLocalJoin; - RuleApplyFunctionArray[SINGLE_PARTITION_JOIN] = &ApplySinglePartitionJoin; + RuleApplyFunctionArray[SINGLE_HASH_PARTITION_JOIN] = + &ApplySingleHashPartitionJoin; + RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = + &ApplySingleRangePartitionJoin; RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin; RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct; @@ -2264,12 +2275,50 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, } +/* + * ApplySingleRangePartitionJoin is a wrapper around ApplySinglePartitionJoin() + * which sets the joinRuleType properly. + */ +static MultiNode * +ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses) +{ + MultiJoin *joinNode = + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + applicableJoinClauses); + + joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN; + + return (MultiNode *) joinNode; +} + + +/* + * ApplySingleHashPartitionJoin is a wrapper around ApplySinglePartitionJoin() + * which sets the joinRuleType properly. + */ +static MultiNode * +ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses) +{ + MultiJoin *joinNode = + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + applicableJoinClauses); + + joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN; + + return (MultiNode *) joinNode; +} + + /* * ApplySinglePartitionJoin creates a new MultiJoin node that joins the left and * right node. The function also adds a MultiPartition node on top of the node * (left or right) that is not partitioned on the join column. */ -static MultiNode * +static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *applicableJoinClauses) @@ -2337,11 +2386,10 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, } /* finally set join operator fields */ - joinNode->joinRuleType = SINGLE_PARTITION_JOIN; joinNode->joinType = joinType; joinNode->joinClauseList = applicableJoinClauses; - return (MultiNode *) joinNode; + return joinNode; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a0e7803ad..7093277e3 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -14,6 +14,7 @@ #include "postgres.h" #include +#include #include "miscadmin.h" @@ -320,7 +321,8 @@ BuildJobTree(MultiTreeRoot *multiTree) if (currentNodeType == T_MultiJoin) { MultiJoin *joinNode = (MultiJoin *) currentNode; - if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN || + if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN || + joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN || joinNode->joinRuleType == DUAL_PARTITION_JOIN) { boundaryNodeJobType = JOIN_MAP_MERGE_JOB; @@ -350,14 +352,19 @@ BuildJobTree(MultiTreeRoot *multiTree) PartitionType partitionType = PARTITION_INVALID_FIRST; Oid baseRelationId = InvalidOid; - if (joinNode->joinRuleType == SINGLE_PARTITION_JOIN) + if (joinNode->joinRuleType == SINGLE_RANGE_PARTITION_JOIN) { partitionType = RANGE_PARTITION_TYPE; baseRelationId = RangePartitionJoinBaseRelationId(joinNode); } + else if (joinNode->joinRuleType == SINGLE_HASH_PARTITION_JOIN) + { + partitionType = SINGLE_HASH_PARTITION_TYPE; + baseRelationId = RangePartitionJoinBaseRelationId(joinNode); + } else if (joinNode->joinRuleType == DUAL_PARTITION_JOIN) { - partitionType = HASH_PARTITION_TYPE; + partitionType = DUAL_HASH_PARTITION_TYPE; } if (CitusIsA(leftChildNode, MultiPartition)) @@ -411,7 +418,8 @@ BuildJobTree(MultiTreeRoot *multiTree) Query *jobQuery = BuildJobQuery(queryNode, dependedJobList); MapMergeJob *mapMergeJob = BuildMapMergeJob(jobQuery, dependedJobList, - partitionKey, HASH_PARTITION_TYPE, + partitionKey, + DUAL_HASH_PARTITION_TYPE, InvalidOid, SUBQUERY_MAP_MERGE_JOB); @@ -1849,21 +1857,20 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, * If join type is not set, this means this job represents a subquery, and * uses hash partitioning. */ - if (partitionType == HASH_PARTITION_TYPE) + if (partitionType == DUAL_HASH_PARTITION_TYPE) { uint32 partitionCount = HashPartitionCount(); - mapMergeJob->partitionType = HASH_PARTITION_TYPE; + mapMergeJob->partitionType = DUAL_HASH_PARTITION_TYPE; mapMergeJob->partitionCount = partitionCount; } - else if (partitionType == RANGE_PARTITION_TYPE) + else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType == + RANGE_PARTITION_TYPE) { - /* build the split point object for the table on the right-hand side */ DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId); bool hasUninitializedShardInterval = false; uint32 shardCount = cache->shardIntervalArrayLength; ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray; - char basePartitionMethod PG_USED_FOR_ASSERTS_ONLY = 0; hasUninitializedShardInterval = cache->hasUninitializedShardInterval; if (hasUninitializedShardInterval) @@ -1872,12 +1879,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, "missing min/max values"))); } - basePartitionMethod = PartitionMethod(baseRelationId); - - /* this join-type currently doesn't work for hash partitioned tables */ - Assert(basePartitionMethod != DISTRIBUTE_BY_HASH); - - mapMergeJob->partitionType = RANGE_PARTITION_TYPE; + mapMergeJob->partitionType = partitionType; mapMergeJob->partitionCount = shardCount; mapMergeJob->sortedShardIntervalArray = sortedShardIntervalArray; mapMergeJob->sortedShardIntervalArrayLength = shardCount; @@ -2729,7 +2731,7 @@ DependsOnHashPartitionJob(Job *job) if (CitusIsA(dependedJob, MapMergeJob)) { MapMergeJob *mapMergeJob = (MapMergeJob *) dependedJob; - if (mapMergeJob->partitionType == HASH_PARTITION_TYPE) + if (mapMergeJob->partitionType == DUAL_HASH_PARTITION_TYPE) { dependsOnHashPartitionJob = true; } @@ -3758,6 +3760,7 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment Task *leftMergeTask = (Task *) leftFragment->fragmentReference; Task *rightMergeTask = (Task *) rightFragment->fragmentReference; + if (leftMergeTask->partitionId != rightMergeTask->partitionId) { ereport(DEBUG2, (errmsg("join prunable for task partitionId %u and %u", @@ -3771,8 +3774,9 @@ JoinPrunable(RangeTableFragment *leftFragment, RangeTableFragment *rightFragment } } + /* - * We have a range (re)partition join. We now get shard intervals for both + * We have a single (re)partition join. We now get shard intervals for both * fragments, and then check if these intervals overlap. */ leftFragmentInterval = FragmentInterval(leftFragment); @@ -4261,13 +4265,32 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) filterQueryEscapedText, partitionColumnName, partitionColumnTypeFullName, splitPointString->data); } + else if (partitionType == SINGLE_HASH_PARTITION_TYPE) + { + ShardInterval **intervalArray = mapMergeJob->sortedShardIntervalArray; + uint32 intervalCount = mapMergeJob->partitionCount; + + ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); + StringInfo splitPointString = SplitPointArrayString(splitPointObject, + partitionColumnType, + partitionColumnTypeMod); + appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, + filterQueryEscapedText, partitionColumnName, + partitionColumnTypeFullName, splitPointString->data); + } else { uint32 partitionCount = mapMergeJob->partitionCount; + ShardInterval **intervalArray = + GenerateSyntheticShardIntervalArray(partitionCount); + ArrayType *splitPointObject = SplitPointObject(intervalArray, + mapMergeJob->partitionCount); + StringInfo splitPointString = + SplitPointArrayString(splitPointObject, INT4OID, get_typmodin(INT4OID)); appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, filterQueryEscapedText, partitionColumnName, - partitionColumnTypeFullName, partitionCount); + partitionColumnTypeFullName, splitPointString->data); } /* convert filter query task into map task */ @@ -4282,6 +4305,46 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) } +/* + * GenerateSyntheticShardIntervalArray returns a shard interval pointer array + * which has a uniform hash distribution for the given input partitionCount. + * + * The function only fills the min/max values of shard the intervals. Thus, should + * not be used for general purpose operations. + */ +ShardInterval ** +GenerateSyntheticShardIntervalArray(int partitionCount) +{ + ShardInterval **shardIntervalArray = palloc0(partitionCount * + sizeof(ShardInterval *)); + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount; + int shardIndex = 0; + + for (shardIndex = 0; shardIndex < partitionCount; ++shardIndex) + { + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); + + /* calculate the split of the hash space */ + int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); + int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); + + shardInterval->relationId = InvalidOid; + shardInterval->minValueExists = true; + shardInterval->minValue = Int32GetDatum(shardMinHashToken); + + shardInterval->maxValueExists = true; + shardInterval->maxValue = Int32GetDatum(shardMaxHashToken); + + shardInterval->shardId = INVALID_SHARD_ID; + shardInterval->valueTypeId = INT4OID; + + shardIntervalArray[shardIndex] = shardInterval; + } + + return shardIntervalArray; +} + + /* * ColumnName resolves the given column's name. The given column could belong to * a regular table or to an intermediate table formed to execute a distributed @@ -4396,6 +4459,10 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) initialPartitionId = 1; partitionCount = partitionCount + 1; } + else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE) + { + initialPartitionId = 0; + } /* build merge tasks and their associated "map output fetch" tasks */ for (partitionId = initialPartitionId; partitionId < partitionCount; partitionId++) @@ -4463,7 +4530,7 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) /* merge task depends on completion of fetch tasks */ mergeTask->dependedTaskList = mapOutputFetchTaskList; - /* if range repartitioned, each merge task represents an interval */ + /* if single repartitioned, each merge task represents an interval */ if (mapMergeJob->partitionType == RANGE_PARTITION_TYPE) { int32 mergeTaskIntervalId = partitionId - 1; @@ -4472,6 +4539,14 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId]; } + else if (mapMergeJob->partitionType == SINGLE_HASH_PARTITION_TYPE) + { + int32 mergeTaskIntervalId = partitionId; + ShardInterval **mergeTaskIntervals = mapMergeJob->sortedShardIntervalArray; + Assert(mergeTaskIntervalId >= 0); + + mergeTask->shardInterval = mergeTaskIntervals[mergeTaskIntervalId]; + } mergeTaskList = lappend(mergeTaskList, mergeTask); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c8fa3c82a..29eb737be 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -373,6 +373,17 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_single_hash_repartition_joins", + gettext_noop("Enables single hash repartitioning between hash " + "distributed tables"), + NULL, + &EnableSingleHashRepartitioning, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.subquery_pushdown", gettext_noop("Enables supported subquery pushdown to workers."), diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 6609fb06c..150255f58 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -168,8 +168,6 @@ static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArra int shardCount, FmgrInfo * shardIntervalSortCompareFunction); -static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, - int shardIntervalArrayLength); static void PrepareWorkerNodeCache(void); static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount); @@ -1181,7 +1179,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, * has a uniform hash distribution, as produced by master_create_worker_shards for * hash partitioned tables. */ -static bool +bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, int shardIntervalArrayLength) { diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 6af2b52d7..d8ab229fa 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -25,11 +25,6 @@ #include "utils/memutils.h" -static int SearchCachedShardInterval(Datum partitionColumnValue, - ShardInterval **shardIntervalCache, - int shardCount, FmgrInfo *compareFunction); - - /* * LowestShardIntervalById returns the shard interval with the lowest shard * ID from a list of shard intervals. @@ -332,8 +327,17 @@ FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry) * matching a given partition column value and returns it's index in the cached * array. If it can not find any shard interval with the given value, it returns * INVALID_SHARD_INDEX. + * + * TODO: Data re-partitioning logic (e.g., worker_hash_partition_table()) + * on the worker nodes relies on this function in order to be consistent + * with shard pruning. Since the worker nodes don't have the metadata, a + * synthetically generated ShardInterval ** is passed to the to this + * function. The synthetic shard intervals contain only shardmin and shardmax + * values. A proper implementation of this approach should be introducing an + * intermediate data structure (e.g., ShardRange) on which this function + * operates instead of operating shard intervals. */ -static int +int SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, FmgrInfo *compareFunction) { diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index a4be31b3b..3a0ba7542 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -23,15 +23,18 @@ #include #include #include +#include #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "catalog/pg_am.h" #include "catalog/pg_collation.h" +#include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" #include "distributed/multi_copy.h" +#include "distributed/multi_physical_planner.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" @@ -53,6 +56,9 @@ static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */ /* Local functions forward declarations */ +static ShardInterval ** SyntheticShardIntervalArrayForShardMinValues( + Datum *shardMinValues, + int shardCount); static StringInfo InitTaskAttemptDirectory(uint64 jobId, uint32 taskId); static uint32 FileBufferSize(int partitionBufferSizeInKB, uint32 fileCount); static FileOutputStream * OpenPartitionFiles(StringInfo directoryName, uint32 fileCount); @@ -73,6 +79,7 @@ static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fil static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionId(Datum partitionValue, const void *context); +static uint32 HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context); static bool FileIsLink(char *filename, struct stat filestat); @@ -178,28 +185,88 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) text *filterQueryText = PG_GETARG_TEXT_P(2); text *partitionColumnText = PG_GETARG_TEXT_P(3); Oid partitionColumnType = PG_GETARG_OID(4); - uint32 partitionCount = PG_GETARG_UINT32(5); + ArrayType *hashRangeObject = NULL; const char *filterQuery = text_to_cstring(filterQueryText); const char *partitionColumn = text_to_cstring(partitionColumnText); HashPartitionContext *partitionContext = NULL; FmgrInfo *hashFunction = NULL; + Datum *hashRangeArray = NULL; + int32 partitionCount = 0; StringInfo taskDirectory = NULL; StringInfo taskAttemptDirectory = NULL; FileOutputStream *partitionFileArray = NULL; - uint32 fileCount = partitionCount; + uint32 fileCount = 0; + + uint32 (*HashPartitionIdFunction)(Datum, const void *); + + Oid partitionBucketOid = InvalidOid; CheckCitusVersion(ERROR); + partitionContext = palloc0(sizeof(HashPartitionContext)); + + /* + * We do this hack for backward compatibility. + * + * In the older versions of Citus, worker_hash_partition_table()'s 6th parameter + * was an integer which denoted the number of buckets to split the shard's data. + * In the later versions of Citus, the sixth parameter is changed to get an array + * of shard ranges, which is used as the ranges to split the shard's data. + * + * Keeping this value is important if the coordinator's Citus version is <= 7.3 + * and worker Citus version is > 7.3. + */ + partitionBucketOid = get_fn_expr_argtype(fcinfo->flinfo, 5); + if (partitionBucketOid == INT4ARRAYOID) + { + hashRangeObject = PG_GETARG_ARRAYTYPE_P(5); + + hashRangeArray = DeconstructArrayObject(hashRangeObject); + partitionCount = ArrayObjectCount(hashRangeObject); + + partitionContext->syntheticShardIntervalArray = + SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount); + partitionContext->hasUniformHashDistribution = + HasUniformHashDistribution(partitionContext->syntheticShardIntervalArray, + partitionCount); + + HashPartitionIdFunction = &HashPartitionId; + } + else if (partitionBucketOid == INT4OID) + { + partitionCount = PG_GETARG_UINT32(5); + + partitionContext->syntheticShardIntervalArray = + GenerateSyntheticShardIntervalArray(partitionCount); + partitionContext->hasUniformHashDistribution = true; + + HashPartitionIdFunction = &HashPartitionIdViaDeprecatedAPI; + } + else + { + /* we should never get other type of parameters */ + ereport(ERROR, (errmsg("unexpected parameter for " + "worker_hash_partition_table()"))); + } + /* use column's type information to get the hashing function */ hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHSTANDARD_PROC); - /* create hash partition context object */ - partitionContext = palloc0(sizeof(HashPartitionContext)); + /* we create as many files as the number of split points */ + fileCount = partitionCount; + partitionContext->hashFunction = hashFunction; partitionContext->partitionCount = partitionCount; + /* we'll use binary search, we need the comparison function */ + if (!partitionContext->hasUniformHashDistribution) + { + partitionContext->comparisonFunction = + GetFunctionInfo(partitionColumnType, BTREE_AM_OID, BTORDER_PROC); + } + /* init directories and files to write the partitioned data to */ taskDirectory = InitTaskDirectory(jobId, taskId); taskAttemptDirectory = InitTaskAttemptDirectory(jobId, taskId); @@ -209,7 +276,7 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) /* call the partitioning function that does the actual work */ FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType, - &HashPartitionId, (const void *) partitionContext, + HashPartitionIdFunction, (const void *) partitionContext, partitionFileArray, fileCount); /* close partition files and atomically rename (commit) them */ @@ -221,6 +288,39 @@ worker_hash_partition_table(PG_FUNCTION_ARGS) } +/* + * SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array + * which gets the shardMinValues from the input shardMinValues array. Note that + * we simply calculate shard max values by decrementing previous shard min values by one. + * + * The function only fills the min/max values of shard the intervals. Thus, should + * not be used for general purpose operations. + */ +static ShardInterval ** +SyntheticShardIntervalArrayForShardMinValues(Datum *shardMinValues, int shardCount) +{ + int shardIndex = 0; + Datum nextShardMaxValue = Int32GetDatum(INT32_MAX); + ShardInterval **syntheticShardIntervalArray = + palloc(sizeof(ShardInterval *) * shardCount); + + for (shardIndex = shardCount - 1; shardIndex >= 0; --shardIndex) + { + Datum currentShardMinValue = shardMinValues[shardIndex]; + ShardInterval *shardInterval = CitusMakeNode(ShardInterval); + + shardInterval->minValue = currentShardMinValue; + shardInterval->maxValue = nextShardMaxValue; + + nextShardMaxValue = Int32GetDatum(DatumGetInt32(currentShardMinValue) - 1); + + syntheticShardIntervalArray[shardIndex] = shardInterval; + } + + return syntheticShardIntervalArray; +} + + /* * GetFunctionInfo first resolves the operator for the given data type, access * method, and support procedure. The function then uses the resolved operator's @@ -1156,6 +1256,52 @@ RangePartitionId(Datum partitionValue, const void *context) /* * HashPartitionId determines the partition number for the given data value * using hash partitioning. More specifically, the function returns zero if the + * given data value is null. If not, the function follows the exact same approach + * as Citus distributed planner uses. + */ +static uint32 +HashPartitionId(Datum partitionValue, const void *context) +{ + HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context; + FmgrInfo *hashFunction = hashPartitionContext->hashFunction; + uint32 partitionCount = hashPartitionContext->partitionCount; + ShardInterval **syntheticShardIntervalArray = + hashPartitionContext->syntheticShardIntervalArray; + FmgrInfo *comparisonFunction = hashPartitionContext->comparisonFunction; + Datum hashDatum = FunctionCall1(hashFunction, partitionValue); + int32 hashResult = 0; + uint32 hashPartitionId = 0; + + if (hashDatum == 0) + { + return hashPartitionId; + } + + if (hashPartitionContext->hasUniformHashDistribution) + { + uint64 hashTokenIncrement = HASH_TOKEN_COUNT / partitionCount; + + hashResult = DatumGetInt32(hashDatum); + hashPartitionId = (uint32) (hashResult - INT32_MIN) / hashTokenIncrement; + } + else + { + hashPartitionId = + SearchCachedShardInterval(hashDatum, syntheticShardIntervalArray, + partitionCount, comparisonFunction); + } + + + return hashPartitionId; +} + + +/* + * HashPartitionIdViaDeprecatedAPI is required to provide backward compatibility + * between the Citus versions 7.4 and older versions. + * + * HashPartitionIdViaDeprecatedAPI determines the partition number for the given data value + * using hash partitioning. More specifically, the function returns zero if the * given data value is null. If not, the function applies the standard Postgres * hashing function for the given data type, and mods the hashed result with the * number of partitions. The function then returns the modded number as the @@ -1166,7 +1312,7 @@ RangePartitionId(Datum partitionValue, const void *context) * see Google "PL/Proxy Users: Hash Functions Have Changed in PostgreSQL 8.4." */ static uint32 -HashPartitionId(Datum partitionValue, const void *context) +HashPartitionIdViaDeprecatedAPI(Datum partitionValue, const void *context) { HashPartitionContext *hashPartitionContext = (HashPartitionContext *) context; FmgrInfo *hashFunction = hashPartitionContext->hashFunction; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 9bf1e09ef..0b34a51fc 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -90,6 +90,8 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void FlushDistTableCache(void); extern void InvalidateMetadataSystemCache(void); extern Datum DistNodeMetadata(void); +extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, + int shardIntervalArrayLength); extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index cecaa20be..ba871b1da 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -30,9 +30,10 @@ typedef enum JoinRuleType JOIN_RULE_INVALID_FIRST = 0, REFERENCE_JOIN = 1, LOCAL_PARTITION_JOIN = 2, - SINGLE_PARTITION_JOIN = 3, - DUAL_PARTITION_JOIN = 4, - CARTESIAN_PRODUCT = 5, + SINGLE_HASH_PARTITION_JOIN = 3, + SINGLE_RANGE_PARTITION_JOIN = 4, + DUAL_PARTITION_JOIN = 5, + CARTESIAN_PRODUCT = 6, /* * Add new join rule types above this comment. After adding, you must also @@ -75,6 +76,7 @@ typedef struct JoinOrderNode /* Config variables managed via guc.c */ extern bool LogMultiJoinOrder; +extern bool EnableSingleHashRepartitioning; /* Function declaration for determining table join orders */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ce2dbc3d7..a9b805a9b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -38,7 +38,7 @@ #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" #define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \ - (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %d)" + (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ (" UINT64_FORMAT ", %d, '%s', '%s')" #define MERGE_FILES_AND_RUN_QUERY_COMMAND \ @@ -69,7 +69,8 @@ typedef enum { PARTITION_INVALID_FIRST = 0, RANGE_PARTITION_TYPE = 1, - HASH_PARTITION_TYPE = 2 + SINGLE_HASH_PARTITION_TYPE = 2, + DUAL_HASH_PARTITION_TYPE = 3 } PartitionType; @@ -323,6 +324,8 @@ extern int CompareShardPlacements(const void *leftElement, const void *rightElem extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); +extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); + /* function declarations for Task and Task list operations */ extern bool TasksEqual(const Task *a, const Task *b); diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 5c9d6cde8..825358e4f 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -36,6 +36,10 @@ extern int ShardIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, DistTableCacheEntry *cacheEntry); extern int FindShardIntervalIndex(Datum searchedValue, DistTableCacheEntry *cacheEntry); +extern int SearchCachedShardInterval(Datum partitionColumnValue, + ShardInterval **shardIntervalCache, + int shardCount, FmgrInfo *compareFunction); extern bool SingleReplicatedTable(Oid relationId); + #endif /* SHARDINTERVAL_UTILS_H_ */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index d78c47c30..4321fab9c 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -15,6 +15,7 @@ #define WORKER_PROTOCOL_H #include "fmgr.h" +#include "distributed/shardinterval_utils.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "storage/fd.h" @@ -73,7 +74,10 @@ typedef struct RangePartitionContext typedef struct HashPartitionContext { FmgrInfo *hashFunction; + FmgrInfo *comparisonFunction; + ShardInterval **syntheticShardIntervalArray; uint32 partitionCount; + bool hasUniformHashDistribution; } HashPartitionContext; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 35d51da29..ade89fc55 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2'; ALTER EXTENSION citus UPDATE TO '7.2-3'; ALTER EXTENSION citus UPDATE TO '7.3-3'; ALTER EXTENSION citus UPDATE TO '7.4-1'; +ALTER EXTENSION citus UPDATE TO '7.4-2'; +ALTER EXTENSION citus UPDATE TO '7.4-3'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 594cf8513..1b2a10a2e 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -103,7 +103,7 @@ ERROR: cannot perform distributed planning on this query DETAIL: Cartesian products are currently unsupported EXPLAIN SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; -LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] +LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) @@ -161,7 +161,7 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] -- range partitioned one. EXPLAIN SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; -LOG: join order: [ "orders_hash" ][ single partition join "customer_append" ] +LOG: join order: [ "orders_hash" ][ single range partition join "customer_append" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) diff --git a/src/test/regress/expected/multi_join_order_tpch_repartition.out b/src/test/regress/expected/multi_join_order_tpch_repartition.out index f11e6f470..9ca9aab86 100644 --- a/src/test/regress/expected/multi_join_order_tpch_repartition.out +++ b/src/test/regress/expected/multi_join_order_tpch_repartition.out @@ -51,7 +51,7 @@ GROUP BY ORDER BY revenue DESC, o_orderdate; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ] QUERY PLAN ------------------------------------------------------------------------------------------------ Sort (cost=0.00..0.00 rows=0 width=0) @@ -94,7 +94,7 @@ GROUP BY c_comment ORDER BY revenue DESC; -LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer_append" ][ reference join "nation" ] +LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single range partition join "customer_append" ][ reference join "nation" ] QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) @@ -135,7 +135,7 @@ WHERE AND l_shipmode in ('AIR', 'AIR REG', 'TRUCK') AND l_shipinstruct = 'DELIVER IN PERSON' ); -LOG: join order: [ "lineitem" ][ single partition join "part_append" ] +LOG: join order: [ "lineitem" ][ single range partition join "part_append" ] QUERY PLAN -------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) @@ -154,7 +154,7 @@ WHERE c_custkey = o_custkey GROUP BY l_partkey; -LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part_append" ][ single partition join "customer_append" ] +LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single range partition join "part_append" ][ single range partition join "customer_append" ] QUERY PLAN -------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index bca1f09c1..2ac6780dc 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -967,14 +967,15 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_repartition_joins TO ON; SELECT * FROM articles_hash a, articles_hash b - WHERE a.id = b.id AND a.author_id = 1; + WHERE a.id = b.id AND a.author_id = 1 +ORDER BY 1 DESC; id | author_id | title | word_count | id | author_id | title | word_count ----+-----------+--------------+------------+----+-----------+--------------+------------ 41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814 - 11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347 31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271 - 1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572 21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890 + 11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347 + 1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572 (5 rows) SET citus.enable_repartition_joins TO OFF; @@ -1779,15 +1780,15 @@ DETAIL: Creating dependency on merge taskId 2 DEBUG: pruning merge fetch taskId 3 DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 4 -DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 4 DEBUG: pruning merge fetch taskId 9 DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 11 -DETAIL: Creating dependency on merge taskId 6 -DEBUG: pruning merge fetch taskId 13 DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 13 +DETAIL: Creating dependency on merge taskId 6 DEBUG: pruning merge fetch taskId 15 DETAIL: Creating dependency on merge taskId 8 ERROR: the query contains a join that requires repartitioning diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out index e22d73463..1be9e76b1 100644 --- a/src/test/regress/expected/non_colocated_join_order.out +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -41,7 +41,7 @@ SET citus.shard_replication_factor to 1; -- for interval [8,10] repartition join logic will be triggered. SET citus.enable_repartition_joins to ON; SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; -LOG: join order: [ "test_table_1" ][ single partition join "test_table_2" ] +LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ] DEBUG: cannot use real time executor with repartition jobs HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. count diff --git a/src/test/regress/expected/single_hash_repartition_join.out b/src/test/regress/expected/single_hash_repartition_join.out new file mode 100644 index 000000000..62f99afce --- /dev/null +++ b/src/test/regress/expected/single_hash_repartition_join.out @@ -0,0 +1,465 @@ +-- +-- MULTI_SINGLE_HASH_REPARTITION_JOIN +-- +CREATE SCHEMA single_hash_repartition; +SET search_path TO 'single_hash_repartition'; +SET citus.enable_single_hash_repartition_joins TO ON; +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); +SET citus.shard_count TO 4; +SELECT create_distributed_table('single_hash_repartition_first', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('single_hash_repartition_second', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +SET citus.log_multi_join_order TO ON; +SET client_min_messages TO DEBUG2; +-- a very basic single hash re-partitioning example +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- the same query with the orders of the tables have changed +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + t2.sum = t1.id; +LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; +LOG: join order: [ "single_hash_repartition_second" ][ reference join "ref_table" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- a more complicated join order, first hash-repartition join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.sum = t2.sum AND t1.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ dual partition join "single_hash_repartition_first" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 24 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- single hash repartitioning is not supported between different column types +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.avg = t3.id; +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +DEBUG: single partition column types do not match +DEBUG: single partition column types do not match +DEBUG: dual partition column types do not match +LOG: join order: [ "single_hash_repartition_first" ][ local partition join "single_hash_repartition_first" ][ cartesian product "single_hash_repartition_second" ] +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported +-- single repartition query in CTE +-- should work fine +EXPLAIN WITH cte1 AS +( + SELECT + t1.id * t2.avg as data + FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 + WHERE + t1.id = t2.sum + AND t1.sum > 5000 + ORDER BY 1 DESC + LIMIT 50 +) +SELECT + count(*) +FROM + cte1, single_hash_repartition_first +WHERE + cte1.data > single_hash_repartition_first.id; +DEBUG: generating subplan 7_1 for CTE cte1: SELECT ((t1.id)::double precision * t2.avg) AS data FROM single_hash_repartition.single_hash_repartition_first t1, single_hash_repartition.single_hash_repartition_second t2 WHERE ((t1.id = t2.sum) AND (t1.sum > 5000)) ORDER BY ((t1.id)::double precision * t2.avg) DESC LIMIT 50 +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: push down of limit count: 50 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- two single repartitions +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.sum = t3.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 24 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- two single repartitions again, but this +-- time the columns of the second join is reverted +EXPLAIN SELECT + avg(t1.avg + t2.avg) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.id = t3.sum +LIMIT 10; +LOG: join order: [ "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_second" ][ single hash partition join "single_hash_repartition_first" ] +DEBUG: push down of limit count: 10 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 24 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- lets try one more thing, show that non uniform shard distribution doesn't +-- end up with single hash repartitioning + UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass); +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [-1073741824,-2] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [0,1073741822] +DEBUG: join prunable for intervals [-2147483648,-1073741826] and [1073741824,2147483646] +DEBUG: join prunable for intervals [-1073741824,-2] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [-1073741824,-2] and [0,1073741822] +DEBUG: join prunable for intervals [-1073741824,-2] and [1073741824,2147483646] +DEBUG: join prunable for intervals [0,1073741822] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [0,1073741822] and [-1073741824,-2] +DEBUG: join prunable for intervals [0,1073741822] and [1073741824,2147483646] +DEBUG: join prunable for intervals [1073741824,2147483646] and [-2147483648,-1073741826] +DEBUG: join prunable for intervals [1073741824,2147483646] and [-1073741824,-2] +DEBUG: join prunable for intervals [1073741824,2147483646] and [0,1073741822] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.sum = t2.id; +LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 3 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 20 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +RESET client_min_messages; +RESET search_path; +DROP SCHEMA single_hash_repartition CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table single_hash_repartition.single_hash_repartition_first +drop cascades to table single_hash_repartition.single_hash_repartition_second +drop cascades to table single_hash_repartition.ref_table +SET citus.enable_single_hash_repartition_joins TO OFF; diff --git a/src/test/regress/expected/worker_check_invalid_arguments.out b/src/test/regress/expected/worker_check_invalid_arguments.out index fbc91b468..058539fce 100644 --- a/src/test/regress/expected/worker_check_invalid_arguments.out +++ b/src/test/regress/expected/worker_check_invalid_arguments.out @@ -46,7 +46,7 @@ ERROR: partition column type 20 and split point type 25 do not match -- Check that we fail with bad partition column type on hash partitioning SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Name, :Bad_Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); ERROR: partition column types 25 and 20 do not match -- Now, partition table data using valid arguments SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, diff --git a/src/test/regress/expected/worker_hash_partition.out b/src/test/regress/expected/worker_hash_partition.out index 61725f4f7..63ed5f27d 100644 --- a/src/test/regress/expected/worker_hash_partition.out +++ b/src/test/regress/expected/worker_hash_partition.out @@ -7,15 +7,11 @@ \set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Type '\'int8\'' \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_All 'SELECT *' --- Hash functions internally return unsigned 32-bit integers. However, when --- called externally, the return value becomes a signed 32-bit integer. We hack --- around this conversion issue by bitwise-anding the hash results. Note that --- this only works because we are modding with 4. The proper Hash_Mod_Function --- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4) --- else ((hashint8(l_orderkey) + 4294967296) % 4) end). -\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )' +-- Hash functions is mapped to exactly behave as Citus planner does +\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_part_00 \set Table_Part_01 lineitem_hash_part_01 \set Table_Part_02 lineitem_hash_part_02 @@ -23,7 +19,7 @@ -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -36,13 +32,25 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; count ------- - 3081 + 2885 +(1 row) + +SELECT COUNT(*) FROM :Table_Part_01; + count +------- + 3009 +(1 row) + +SELECT COUNT(*) FROM :Table_Part_02; + count +------- + 3104 (1 row) SELECT COUNT(*) FROM :Table_Part_03; count ------- - 2935 + 3002 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/expected/worker_hash_partition_complex.out b/src/test/regress/expected/worker_hash_partition_complex.out index 2dfbe3702..a90f4d505 100644 --- a/src/test/regress/expected/worker_hash_partition_complex.out +++ b/src/test/regress/expected/worker_hash_partition_complex.out @@ -7,9 +7,10 @@ \set Partition_Column_Text '\'l_partkey\'' \set Partition_Column_Type 23 \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment' \set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08' -\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_complex_part_00 \set Table_Part_01 lineitem_hash_complex_part_01 \set Table_Part_02 lineitem_hash_complex_part_02 @@ -22,7 +23,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, ' WHERE l_shipdate >= date ''1992-01-15''' ' AND l_discount between 0.02 AND 0.08', :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -36,13 +37,13 @@ COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101104/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; count ------- - 1988 + 1883 (1 row) SELECT COUNT(*) FROM :Table_Part_03; count ------- - 1881 + 1913 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/expected/worker_null_data_partition.out b/src/test/regress/expected/worker_null_data_partition.out index 11881fbc9..344658142 100644 --- a/src/test/regress/expected/worker_null_data_partition.out +++ b/src/test/regress/expected/worker_null_data_partition.out @@ -98,14 +98,15 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( -- into the 0th repartition bucket. \set Hash_TaskId 101107 \set Partition_Count 4 -\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' +\set hashTokenIncrement 1073741824 \set Hash_Table_Part_00 supplier_hash_part_00 \set Hash_Table_Part_01 supplier_hash_part_01 \set Hash_Table_Part_02 supplier_hash_part_02 -- Run select query, and apply hash partitioning on query results SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); worker_hash_partition_table ----------------------------- @@ -117,13 +118,13 @@ COPY :Hash_Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_000 SELECT COUNT(*) FROM :Hash_Table_Part_00; count ------- - 298 + 282 (1 row) SELECT COUNT(*) FROM :Hash_Table_Part_02; count ------- - 203 + 102 (1 row) -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4e7b8583b..8156c9e2d 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -63,7 +63,7 @@ test: multi_average_expression multi_working_columns multi_having_pushdown test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg test: multi_agg_type_conversion multi_count_type_conversion -test: multi_partition_pruning +test: multi_partition_pruning single_hash_repartition_join test: multi_join_pruning multi_hash_pruning test: multi_null_minmax_value_pruning test: multi_query_directory_cleanup diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 45f6ff983..dc2d7dcb1 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -134,6 +134,8 @@ ALTER EXTENSION citus UPDATE TO '7.2-2'; ALTER EXTENSION citus UPDATE TO '7.2-3'; ALTER EXTENSION citus UPDATE TO '7.3-3'; ALTER EXTENSION citus UPDATE TO '7.4-1'; +ALTER EXTENSION citus UPDATE TO '7.4-2'; +ALTER EXTENSION citus UPDATE TO '7.4-3'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 1d9216577..3c5f4b69a 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -461,7 +461,8 @@ SELECT * SET citus.enable_repartition_joins TO ON; SELECT * FROM articles_hash a, articles_hash b - WHERE a.id = b.id AND a.author_id = 1; + WHERE a.id = b.id AND a.author_id = 1 +ORDER BY 1 DESC; SET citus.enable_repartition_joins TO OFF; -- queries which hit more than 1 shards are not router plannable or executable -- handled by real-time executor diff --git a/src/test/regress/sql/single_hash_repartition_join.sql b/src/test/regress/sql/single_hash_repartition_join.sql new file mode 100644 index 000000000..8745ac07c --- /dev/null +++ b/src/test/regress/sql/single_hash_repartition_join.sql @@ -0,0 +1,143 @@ +-- +-- MULTI_SINGLE_HASH_REPARTITION_JOIN +-- + +CREATE SCHEMA single_hash_repartition; + +SET search_path TO 'single_hash_repartition'; +SET citus.enable_single_hash_repartition_joins TO ON; + +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); + +SET citus.shard_count TO 4; + +SELECT create_distributed_table('single_hash_repartition_first', 'id'); +SELECT create_distributed_table('single_hash_repartition_second', 'id'); +SELECT create_reference_table('ref_table'); + +SET citus.log_multi_join_order TO ON; +SET client_min_messages TO DEBUG2; + +-- a very basic single hash re-partitioning example +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; + +-- the same query with the orders of the tables have changed +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + t2.sum = t1.id; + +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; + +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; + + +-- a more complicated join order, first hash-repartition join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.sum = t2.sum AND t1.sum = t3.id; + +-- single hash repartitioning is not supported between different column types +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.avg = t3.id; + +-- single repartition query in CTE +-- should work fine +EXPLAIN WITH cte1 AS +( + SELECT + t1.id * t2.avg as data + FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 + WHERE + t1.id = t2.sum + AND t1.sum > 5000 + ORDER BY 1 DESC + LIMIT 50 +) +SELECT + count(*) +FROM + cte1, single_hash_repartition_first +WHERE + cte1.data > single_hash_repartition_first.id; + + +-- two single repartitions +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.sum = t3.id; + + +-- two single repartitions again, but this +-- time the columns of the second join is reverted +EXPLAIN SELECT + avg(t1.avg + t2.avg) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.sum AND t2.id = t3.sum +LIMIT 10; + +-- lets try one more thing, show that non uniform shard distribution doesn't +-- end up with single hash repartitioning + + UPDATE pg_dist_shard SET shardmaxvalue = shardmaxvalue::int - 1 WHERE logicalrelid IN ('single_hash_repartition_first'::regclass); + +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.id = t2.sum; + +-- the following queries should also be a single hash repartition queries +-- note that since we've manually updated the metadata without changing the +-- the corresponding data, the results of the query would be wrong +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_second t2 +WHERE + t1.sum = t2.id; + +RESET client_min_messages; + +RESET search_path; +DROP SCHEMA single_hash_repartition CASCADE; +SET citus.enable_single_hash_repartition_joins TO OFF; + diff --git a/src/test/regress/sql/worker_check_invalid_arguments.sql b/src/test/regress/sql/worker_check_invalid_arguments.sql index abceee14c..c851d8b16 100644 --- a/src/test/regress/sql/worker_check_invalid_arguments.sql +++ b/src/test/regress/sql/worker_check_invalid_arguments.sql @@ -54,7 +54,7 @@ SELECT worker_range_partition_table(:JobId, :TaskId, :Select_Query_Text, SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Name, :Bad_Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -- Now, partition table data using valid arguments diff --git a/src/test/regress/sql/worker_hash_partition.sql b/src/test/regress/sql/worker_hash_partition.sql index 024b46ed7..6137638fe 100644 --- a/src/test/regress/sql/worker_hash_partition.sql +++ b/src/test/regress/sql/worker_hash_partition.sql @@ -10,18 +10,12 @@ \set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Type '\'int8\'' \set Partition_Count 4 - +\set hashTokenIncrement 1073741824 \set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_All 'SELECT *' --- Hash functions internally return unsigned 32-bit integers. However, when --- called externally, the return value becomes a signed 32-bit integer. We hack --- around this conversion issue by bitwise-anding the hash results. Note that --- this only works because we are modding with 4. The proper Hash_Mod_Function --- would be (case when hashint8(l_orderkey) >= 0 then (hashint8(l_orderkey) % 4) --- else ((hashint8(l_orderkey) + 4294967296) % 4) end). - -\set Hash_Mod_Function '( (hashint8(l_orderkey) & 2147483647) % 4 )' +-- Hash functions is mapped to exactly behave as Citus planner does +\set Hash_Mod_Function '( hashint8(l_orderkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_part_00 \set Table_Part_01 lineitem_hash_part_01 @@ -32,7 +26,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type::regtype, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000'; COPY :Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00001'; @@ -40,6 +34,8 @@ COPY :Table_Part_02 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00002'; COPY :Table_Part_03 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00003'; SELECT COUNT(*) FROM :Table_Part_00; +SELECT COUNT(*) FROM :Table_Part_01; +SELECT COUNT(*) FROM :Table_Part_02; SELECT COUNT(*) FROM :Table_Part_03; -- We first compute the difference of partition tables against the base table. diff --git a/src/test/regress/sql/worker_hash_partition_complex.sql b/src/test/regress/sql/worker_hash_partition_complex.sql index 5b860ec71..722109332 100644 --- a/src/test/regress/sql/worker_hash_partition_complex.sql +++ b/src/test/regress/sql/worker_hash_partition_complex.sql @@ -10,11 +10,12 @@ \set Partition_Column_Text '\'l_partkey\'' \set Partition_Column_Type 23 \set Partition_Count 4 +\set hashTokenIncrement 1073741824 \set Select_Columns 'SELECT l_partkey, l_discount, l_shipdate, l_comment' \set Select_Filters 'l_shipdate >= date \'1992-01-15\' AND l_discount between 0.02 AND 0.08' -\set Hash_Mod_Function '( (hashint4(l_partkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(l_partkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' \set Table_Part_00 lineitem_hash_complex_part_00 \set Table_Part_01 lineitem_hash_complex_part_01 @@ -30,7 +31,7 @@ SELECT worker_hash_partition_table(:JobId, :TaskId, ' WHERE l_shipdate >= date ''1992-01-15''' ' AND l_discount between 0.02 AND 0.08', :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); -- Copy partitioned data files into tables for testing purposes diff --git a/src/test/regress/sql/worker_null_data_partition.sql b/src/test/regress/sql/worker_null_data_partition.sql index 557904187..b6fa47d3d 100644 --- a/src/test/regress/sql/worker_null_data_partition.sql +++ b/src/test/regress/sql/worker_null_data_partition.sql @@ -69,7 +69,8 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( \set Hash_TaskId 101107 \set Partition_Count 4 -\set Hash_Mod_Function '( (hashint4(s_nationkey) & 2147483647) % 4 )' +\set Hash_Mod_Function '( hashint4(s_nationkey)::int8 - (-2147483648))::int8 / :hashTokenIncrement::int8' +\set hashTokenIncrement 1073741824 \set Hash_Table_Part_00 supplier_hash_part_00 \set Hash_Table_Part_01 supplier_hash_part_01 @@ -79,7 +80,7 @@ SELECT COUNT(*) AS diff_rhs_02 FROM ( SELECT worker_hash_partition_table(:JobId, :Hash_TaskId, :Select_Query_Text, :Partition_Column_Text, :Partition_Column_Type, - :Partition_Count); + ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); COPY :Hash_Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00000'; COPY :Hash_Table_Part_01 FROM 'base/pgsql_job_cache/job_201010/task_101107/p_00001';