Fix partition column index issue (#4591)

* Fix partition column index issue

We send column names to worker_hash/range_partition_table methods, and
in these methods we check the column name index from tuple descriptor.
Then this index is used to decide the bucket that the current row will
be sent for the repartition.

This becomes a problem when there are the same column names in the
tupleDescriptor. Then we can choose the wrong index. Hence the
partitioned data will be put to wrong workers. Then the result could
miss some data because workers might contain different range of data.

An example:
TupleDescriptor contains "trip_id", "car_id", "car_id" for one table.
It contains only "car_id" for the other table. And assuming that the
tables will be partitioned by car_id, it is not certain what should be
used for deciding the bucket number for the first table. Assuming value
2 goes to bucket 2 and value 3 goes to bucket 3, it is not certain which
bucket "1 2 3" (trip_id, car_id, car_id)  row will go to.

As a solution we send the index of partition column in targetList
instead of the column name.

The old API is kept so that if workers upgrade work, it still works
(though it will have the same bug)

* Use the same method so that backporting is easier
pull/4410/head
SaitTalhaNisanci 2021-01-29 14:40:40 +03:00 committed by GitHub
parent 1ba399f5ca
commit 738825cc38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 70 deletions

View File

@ -50,8 +50,10 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/string_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -225,8 +227,7 @@ static void AssignDataFetchDependencies(List *taskList);
static uint32 TaskListHighestTaskId(List *taskList); static uint32 TaskListHighestTaskId(List *taskList);
static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList); static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList);
static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
char *partitionColumnName); uint32 partitionColumnIndex);
static char * ColumnName(Var *column, List *rangeTableList);
static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList,
uint32 taskIdIndex); uint32 taskIdIndex);
static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId); static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId);
@ -237,6 +238,7 @@ static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr); static List * FetchEqualityAttrNumsForRTEOpExpr(OpExpr *opExpr);
static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr); static List * FetchEqualityAttrNumsForRTEBoolExpr(BoolExpr *boolExpr);
static List * FetchEqualityAttrNumsForList(List *nodeList); static List * FetchEqualityAttrNumsForList(List *nodeList);
static int PartitionColumnIndex(Var *targetVar, List *targetList);
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13
static List * GetColumnOriginalIndexes(Oid relationId); static List * GetColumnOriginalIndexes(Oid relationId);
#endif #endif
@ -4477,11 +4479,10 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
{ {
List *mapTaskList = NIL; List *mapTaskList = NIL;
Query *filterQuery = mapMergeJob->job.jobQuery; Query *filterQuery = mapMergeJob->job.jobQuery;
List *rangeTableList = filterQuery->rtable;
ListCell *filterTaskCell = NULL; ListCell *filterTaskCell = NULL;
Var *partitionColumn = mapMergeJob->partitionColumn; Var *partitionColumn = mapMergeJob->partitionColumn;
char *partitionColumnName = NULL;
uint32 partitionColumnResNo = 0;
List *groupClauseList = filterQuery->groupClause; List *groupClauseList = filterQuery->groupClause;
if (groupClauseList != NIL) if (groupClauseList != NIL)
{ {
@ -4490,29 +4491,19 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
targetEntryList); targetEntryList);
TargetEntry *groupByTargetEntry = (TargetEntry *) linitial(groupTargetEntryList); TargetEntry *groupByTargetEntry = (TargetEntry *) linitial(groupTargetEntryList);
partitionColumnName = groupByTargetEntry->resname; partitionColumnResNo = groupByTargetEntry->resno;
} }
else else
{ {
TargetEntry *targetEntry = tlist_member((Expr *) partitionColumn, partitionColumnResNo = PartitionColumnIndex(partitionColumn,
filterQuery->targetList); filterQuery->targetList);
if (targetEntry != NULL)
{
/* targetEntry->resname may be NULL */
partitionColumnName = targetEntry->resname;
}
if (partitionColumnName == NULL)
{
partitionColumnName = ColumnName(partitionColumn, rangeTableList);
}
} }
foreach(filterTaskCell, filterTaskList) foreach(filterTaskCell, filterTaskList)
{ {
Task *filterTask = (Task *) lfirst(filterTaskCell); Task *filterTask = (Task *) lfirst(filterTaskCell);
StringInfo mapQueryString = CreateMapQueryString(mapMergeJob, filterTask, StringInfo mapQueryString = CreateMapQueryString(mapMergeJob, filterTask,
partitionColumnName); partitionColumnResNo);
/* convert filter query task into map task */ /* convert filter query task into map task */
Task *mapTask = filterTask; Task *mapTask = filterTask;
@ -4526,12 +4517,40 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
} }
/*
* PartitionColumnIndex finds the index of the given target var.
*/
static int
PartitionColumnIndex(Var *targetVar, List *targetList)
{
TargetEntry *targetEntry = NULL;
int resNo = 1;
foreach_ptr(targetEntry, targetList)
{
if (IsA(targetEntry->expr, Var))
{
Var *candidateVar = (Var *) targetEntry->expr;
if (candidateVar->varattno == targetVar->varattno &&
candidateVar->varno == targetVar->varno)
{
return resNo;
}
resNo++;
}
}
ereport(ERROR, (errmsg("unexpected state: %d varno %d varattno couldn't be found",
targetVar->varno, targetVar->varattno)));
return resNo;
}
/* /*
* CreateMapQueryString creates and returns the map query string for the given filterTask. * CreateMapQueryString creates and returns the map query string for the given filterTask.
*/ */
static StringInfo static StringInfo
CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
char *partitionColumnName) uint32 partitionColumnIndex)
{ {
uint64 jobId = filterTask->jobId; uint64 jobId = filterTask->jobId;
uint32 taskId = filterTask->taskId; uint32 taskId = filterTask->taskId;
@ -4577,8 +4596,9 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
partitionCommand = HASH_PARTITION_COMMAND; partitionCommand = HASH_PARTITION_COMMAND;
} }
char *partitionColumnIndextText = ConvertIntToString(partitionColumnIndex);
appendStringInfo(mapQueryString, partitionCommand, jobId, taskId, appendStringInfo(mapQueryString, partitionCommand, jobId, taskId,
filterQueryEscapedText, partitionColumnName, filterQueryEscapedText, partitionColumnIndextText,
partitionColumnTypeFullName, splitPointString->data); partitionColumnTypeFullName, splitPointString->data);
return mapQueryString; return mapQueryString;
} }
@ -4674,40 +4694,6 @@ RowModifyLevelForQuery(Query *query)
} }
/*
* ColumnName resolves the given column's name. The given column could belong to
* a regular table or to an intermediate table formed to execute a distributed
* query.
*/
static char *
ColumnName(Var *column, List *rangeTableList)
{
char *columnName = NULL;
Index tableId = column->varno;
AttrNumber columnNumber = column->varattno;
RangeTblEntry *rangeTableEntry = rt_fetch(tableId, rangeTableList);
CitusRTEKind rangeTableKind = GetRangeTblKind(rangeTableEntry);
if (rangeTableKind == CITUS_RTE_REMOTE_QUERY)
{
Alias *referenceNames = rangeTableEntry->eref;
List *columnNameList = referenceNames->colnames;
int columnIndex = columnNumber - 1;
Value *columnValue = (Value *) list_nth(columnNameList, columnIndex);
columnName = strVal(columnValue);
}
else if (rangeTableKind == CITUS_RTE_RELATION)
{
Oid relationId = rangeTableEntry->relid;
columnName = get_attname(relationId, columnNumber, false);
}
Assert(columnName != NULL);
return columnName;
}
/* /*
* ArrayObjectToString converts an SQL object to its string representation. * ArrayObjectToString converts an SQL object to its string representation.
*/ */

View File

@ -67,4 +67,3 @@ DROP FUNCTION pg_catalog.master_create_worker_shards(text, integer, integer);
DROP FUNCTION pg_catalog.mark_tables_colocated(regclass, regclass[]); DROP FUNCTION pg_catalog.mark_tables_colocated(regclass, regclass[]);
#include "udfs/citus_shard_sizes/10.0-1.sql" #include "udfs/citus_shard_sizes/10.0-1.sql"
#include "udfs/citus_shards/10.0-1.sql" #include "udfs/citus_shards/10.0-1.sql"

View File

@ -11,16 +11,17 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "nodes/primnodes.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "distributed/query_utils.h" #include "distributed/query_utils.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/listutils.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry); static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry);
/* /*
* ExtractRangeTableList walks over a tree to gather entries. * ExtractRangeTableList walks over a tree to gather entries.
* Execution is parameterized by passing walkerMode flag via ExtractRangeTableWalkerContext * Execution is parameterized by passing walkerMode flag via ExtractRangeTableWalkerContext

View File

@ -70,7 +70,8 @@ static void RenameDirectory(StringInfo oldDirectoryName, StringInfo newDirectory
static void FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite); static void FileOutputStreamWrite(FileOutputStream *file, StringInfo dataToWrite);
static void FileOutputStreamFlush(FileOutputStream *file); static void FileOutputStreamFlush(FileOutputStream *file);
static void FilterAndPartitionTable(const char *filterQuery, static void FilterAndPartitionTable(const char *filterQuery,
const char *columnName, Oid columnType, char *partitionColumnName,
int partitionColumnIndex, Oid columnType,
PartitionIdFunction partitionIdFunction, PartitionIdFunction partitionIdFunction,
const void *partitionIdContext, const void *partitionIdContext,
FileOutputStream *partitionFileArray, FileOutputStream *partitionFileArray,
@ -86,7 +87,9 @@ static uint32 HashPartitionId(Datum partitionValue, Oid partitionCollation,
const void *context); const void *context);
static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId); static StringInfo UserPartitionFilename(StringInfo directoryName, uint32 partitionId);
static bool FileIsLink(const char *filename, struct stat filestat); static bool FileIsLink(const char *filename, struct stat filestat);
static void PartitionColumnIndexOrPartitionColumnName(char *partitionColumnNameCandidate,
char **partitionColumnName,
uint32 *partitionColumnIndex);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_range_partition_table); PG_FUNCTION_INFO_V1(worker_range_partition_table);
@ -110,12 +113,19 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
uint32 taskId = PG_GETARG_UINT32(1); uint32 taskId = PG_GETARG_UINT32(1);
text *filterQueryText = PG_GETARG_TEXT_P(2); text *filterQueryText = PG_GETARG_TEXT_P(2);
text *partitionColumnText = PG_GETARG_TEXT_P(3); text *partitionColumnText = PG_GETARG_TEXT_P(3);
char *partitionColumnNameCandidate = text_to_cstring(partitionColumnText);
char *partitionColumnName = NULL;
uint32 partitionColumnIndex = 0;
PartitionColumnIndexOrPartitionColumnName(partitionColumnNameCandidate,
&partitionColumnName,
&partitionColumnIndex);
Oid partitionColumnType = PG_GETARG_OID(4); Oid partitionColumnType = PG_GETARG_OID(4);
ArrayType *splitPointObject = PG_GETARG_ARRAYTYPE_P(5); ArrayType *splitPointObject = PG_GETARG_ARRAYTYPE_P(5);
const char *filterQuery = text_to_cstring(filterQueryText);
const char *partitionColumn = text_to_cstring(partitionColumnText);
const char *filterQuery = text_to_cstring(filterQueryText);
/* first check that array element's and partition column's types match */ /* first check that array element's and partition column's types match */
Oid splitPointType = ARR_ELEMTYPE(splitPointObject); Oid splitPointType = ARR_ELEMTYPE(splitPointObject);
@ -152,7 +162,8 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
FileBufferSizeInBytes = FileBufferSize(PartitionBufferSize, fileCount); FileBufferSizeInBytes = FileBufferSize(PartitionBufferSize, fileCount);
/* call the partitioning function that does the actual work */ /* call the partitioning function that does the actual work */
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType, FilterAndPartitionTable(filterQuery, partitionColumnName, partitionColumnIndex,
partitionColumnType,
&RangePartitionId, (const void *) partitionContext, &RangePartitionId, (const void *) partitionContext,
partitionFileArray, fileCount); partitionFileArray, fileCount);
@ -160,7 +171,6 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
ClosePartitionFiles(partitionFileArray, fileCount); ClosePartitionFiles(partitionFileArray, fileCount);
CitusRemoveDirectory(taskDirectory->data); CitusRemoveDirectory(taskDirectory->data);
RenameDirectory(taskAttemptDirectory, taskDirectory); RenameDirectory(taskAttemptDirectory, taskDirectory);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -182,11 +192,19 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
uint32 taskId = PG_GETARG_UINT32(1); uint32 taskId = PG_GETARG_UINT32(1);
text *filterQueryText = PG_GETARG_TEXT_P(2); text *filterQueryText = PG_GETARG_TEXT_P(2);
text *partitionColumnText = PG_GETARG_TEXT_P(3); text *partitionColumnText = PG_GETARG_TEXT_P(3);
char *partitionColumnNameCandidate = text_to_cstring(partitionColumnText);
char *partitionColumnName = NULL;
uint32 partitionColumnIndex = 0;
PartitionColumnIndexOrPartitionColumnName(partitionColumnNameCandidate,
&partitionColumnName,
&partitionColumnIndex);
Oid partitionColumnType = PG_GETARG_OID(4); Oid partitionColumnType = PG_GETARG_OID(4);
ArrayType *hashRangeObject = PG_GETARG_ARRAYTYPE_P(5); ArrayType *hashRangeObject = PG_GETARG_ARRAYTYPE_P(5);
const char *filterQuery = text_to_cstring(filterQueryText); const char *filterQuery = text_to_cstring(filterQueryText);
const char *partitionColumn = text_to_cstring(partitionColumnText);
Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject); Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject);
int32 partitionCount = ArrayObjectCount(hashRangeObject); int32 partitionCount = ArrayObjectCount(hashRangeObject);
@ -226,7 +244,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
FileBufferSizeInBytes = FileBufferSize(PartitionBufferSize, fileCount); FileBufferSizeInBytes = FileBufferSize(PartitionBufferSize, fileCount);
/* call the partitioning function that does the actual work */ /* call the partitioning function that does the actual work */
FilterAndPartitionTable(filterQuery, partitionColumn, partitionColumnType, FilterAndPartitionTable(filterQuery, partitionColumnName, partitionColumnIndex,
partitionColumnType,
&HashPartitionId, (const void *) partitionContext, &HashPartitionId, (const void *) partitionContext,
partitionFileArray, fileCount); partitionFileArray, fileCount);
@ -234,11 +253,43 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
ClosePartitionFiles(partitionFileArray, fileCount); ClosePartitionFiles(partitionFileArray, fileCount);
CitusRemoveDirectory(taskDirectory->data); CitusRemoveDirectory(taskDirectory->data);
RenameDirectory(taskAttemptDirectory, taskDirectory); RenameDirectory(taskAttemptDirectory, taskDirectory);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/*
* PartitionColumnIndexOrPartitionColumnName either sets partitionColumnName or
* partitionColumnIndex. See below for more.
*/
static void
PartitionColumnIndexOrPartitionColumnName(char *partitionColumnNameCandidate,
char **partitionColumnName,
uint32 *partitionColumnIndex)
{
char *endptr = NULL;
uint32 partitionColumnIndexCandidate =
strtoul(partitionColumnNameCandidate, &endptr, 10 /*base*/);
if (endptr == partitionColumnNameCandidate)
{
/*
* There was a bug around using the column name in worker_[hash|range]_partition_table
* APIs and one of the solutions was to send partition column index directly to these APIs.
* However, this would mean change in API signature and would introduce difficulties
* in upgrade paths. Instead of changing the API signature, we send the partition column index
* as text. In case of rolling upgrades, when a worker is upgraded and coordinator is not, it
* is possible that the text still has the column name, not the column index. So
* we rely on detecting that with a parse error here.
*
*/
*partitionColumnName = partitionColumnNameCandidate;
}
else
{
*partitionColumnIndex = partitionColumnIndexCandidate;
}
}
/* /*
* SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array * SyntheticShardIntervalArrayForShardMinValues returns a shard interval pointer array
* which gets the shardMinValues from the input shardMinValues array. Note that * which gets the shardMinValues from the input shardMinValues array. Note that
@ -845,14 +896,14 @@ FileOutputStreamFlush(FileOutputStream *file)
*/ */
static void static void
FilterAndPartitionTable(const char *filterQuery, FilterAndPartitionTable(const char *filterQuery,
const char *partitionColumnName, Oid partitionColumnType, char *partitionColumnName,
int partitionColumnIndex, Oid partitionColumnType,
PartitionIdFunction partitionIdFunction, PartitionIdFunction partitionIdFunction,
const void *partitionIdContext, const void *partitionIdContext,
FileOutputStream *partitionFileArray, FileOutputStream *partitionFileArray,
uint32 fileCount) uint32 fileCount)
{ {
FmgrInfo *columnOutputFunctions = NULL; FmgrInfo *columnOutputFunctions = NULL;
int partitionColumnIndex = 0;
Oid partitionColumnTypeId = InvalidOid; Oid partitionColumnTypeId = InvalidOid;
Oid partitionColumnCollation = InvalidOid; Oid partitionColumnCollation = InvalidOid;
@ -888,8 +939,14 @@ FilterAndPartitionTable(const char *filterQuery,
{ {
ereport(ERROR, (errmsg("no partition to read into"))); ereport(ERROR, (errmsg("no partition to read into")));
} }
if (partitionColumnName != NULL)
partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName); {
/*
* in old API, the partition column name is used
* to determine partitionColumnIndex
*/
partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName);
}
partitionColumnTypeId = SPI_gettypeid(rowDescriptor, partitionColumnIndex); partitionColumnTypeId = SPI_gettypeid(rowDescriptor, partitionColumnIndex);
partitionColumnCollation = TupleDescAttr(rowDescriptor, partitionColumnIndex - partitionColumnCollation = TupleDescAttr(rowDescriptor, partitionColumnIndex -
1)->attcollation; 1)->attcollation;

View File

@ -13,6 +13,7 @@
#include "postgres.h" #include "postgres.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h"
/* Enum to define execution flow of ExtractRangeTableList */ /* Enum to define execution flow of ExtractRangeTableList */
typedef enum ExtractRangeTableMode typedef enum ExtractRangeTableMode

View File

@ -117,9 +117,58 @@ WHERE
(7 rows) (7 rows)
SET citus.enable_single_hash_repartition_joins TO OFF; SET citus.enable_single_hash_repartition_joins TO OFF;
--issue 4315
create table cars (car_id int);
insert into cars select s from generate_series(1,10) s;
create table trips (trip_id int, car_id int);
insert into trips select s % 10, s % 11 from generate_series(1, 100) s;
-- the result of this should be the same when the tables are distributed
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
count
---------------------------------------------------------------------
829
(1 row)
select create_distributed_table('trips', 'trip_id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$adaptive_executor.trips$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('cars', 'car_id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$adaptive_executor.cars$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
set citus.enable_repartition_joins to on;
set citus.enable_single_hash_repartition_joins to off;
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
count
---------------------------------------------------------------------
829
(1 row)
set citus.enable_single_hash_repartition_joins to on;
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
count
---------------------------------------------------------------------
829
(1 row)
DROP SCHEMA adaptive_executor CASCADE; DROP SCHEMA adaptive_executor CASCADE;
NOTICE: drop cascades to 4 other objects NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to table ab DETAIL: drop cascades to table ab
drop cascades to table single_hash_repartition_first drop cascades to table single_hash_repartition_first
drop cascades to table single_hash_repartition_second drop cascades to table single_hash_repartition_second
drop cascades to table ref_table drop cascades to table ref_table
drop cascades to table cars
drop cascades to table trips

View File

@ -60,4 +60,24 @@ WHERE
SET citus.enable_single_hash_repartition_joins TO OFF; SET citus.enable_single_hash_repartition_joins TO OFF;
--issue 4315
create table cars (car_id int);
insert into cars select s from generate_series(1,10) s;
create table trips (trip_id int, car_id int);
insert into trips select s % 10, s % 11 from generate_series(1, 100) s;
-- the result of this should be the same when the tables are distributed
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
select create_distributed_table('trips', 'trip_id');
select create_distributed_table('cars', 'car_id');
set citus.enable_repartition_joins to on;
set citus.enable_single_hash_repartition_joins to off;
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
set citus.enable_single_hash_repartition_joins to on;
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
DROP SCHEMA adaptive_executor CASCADE; DROP SCHEMA adaptive_executor CASCADE;