mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into eag/test-workflow
commit
092b9a0187
|
|
@ -1394,9 +1394,6 @@ static StripeMetadata *
|
||||||
UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
||||||
uint64 dataLength, uint64 rowCount, uint64 chunkCount)
|
uint64 dataLength, uint64 rowCount, uint64 chunkCount)
|
||||||
{
|
{
|
||||||
SnapshotData dirtySnapshot;
|
|
||||||
InitDirtySnapshot(dirtySnapshot);
|
|
||||||
|
|
||||||
ScanKeyData scanKey[2];
|
ScanKeyData scanKey[2];
|
||||||
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
|
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
|
||||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(storageId));
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(storageId));
|
||||||
|
|
@ -1405,23 +1402,16 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
||||||
|
|
||||||
Oid columnarStripesOid = ColumnarStripeRelationId();
|
Oid columnarStripesOid = ColumnarStripeRelationId();
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= 180000
|
Relation columnarStripes = table_open(columnarStripesOid, AccessShareLock);
|
||||||
|
|
||||||
/* CatalogTupleUpdate performs a normal heap UPDATE → RowExclusiveLock */
|
|
||||||
const LOCKMODE openLockMode = RowExclusiveLock;
|
|
||||||
#else
|
|
||||||
|
|
||||||
/* In‑place update never changed tuple length → AccessShareLock was enough */
|
|
||||||
const LOCKMODE openLockMode = AccessShareLock;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
Relation columnarStripes = table_open(columnarStripesOid, openLockMode);
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
|
||||||
|
|
||||||
Oid indexId = ColumnarStripePKeyIndexRelationId();
|
Oid indexId = ColumnarStripePKeyIndexRelationId();
|
||||||
bool indexOk = OidIsValid(indexId);
|
bool indexOk = OidIsValid(indexId);
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(columnarStripes, indexId, indexOk,
|
|
||||||
&dirtySnapshot, 2, scanKey);
|
void *state;
|
||||||
|
HeapTuple tuple;
|
||||||
|
systable_inplace_update_begin(columnarStripes, indexId, indexOk, NULL,
|
||||||
|
2, scanKey, &tuple, &state);
|
||||||
|
|
||||||
static bool loggedSlowMetadataAccessWarning = false;
|
static bool loggedSlowMetadataAccessWarning = false;
|
||||||
if (!indexOk && !loggedSlowMetadataAccessWarning)
|
if (!indexOk && !loggedSlowMetadataAccessWarning)
|
||||||
|
|
@ -1430,8 +1420,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
||||||
loggedSlowMetadataAccessWarning = true;
|
loggedSlowMetadataAccessWarning = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
HeapTuple oldTuple = systable_getnext(scanDescriptor);
|
if (!HeapTupleIsValid(tuple))
|
||||||
if (!HeapTupleIsValid(oldTuple))
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
|
ereport(ERROR, (errmsg("attempted to modify an unexpected stripe, "
|
||||||
"columnar storage with id=" UINT64_FORMAT
|
"columnar storage with id=" UINT64_FORMAT
|
||||||
|
|
@ -1439,6 +1428,11 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
||||||
storageId, stripeId)));
|
storageId, stripeId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* systable_inplace_update_finish already doesn't allow changing size of the original
|
||||||
|
* tuple, so we don't allow setting any Datum's to NULL values.
|
||||||
|
*/
|
||||||
|
|
||||||
Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
@ -1453,43 +1447,21 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
|
||||||
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
|
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
|
||||||
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
|
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
|
||||||
|
|
||||||
HeapTuple modifiedTuple = heap_modify_tuple(oldTuple,
|
tuple = heap_modify_tuple(tuple,
|
||||||
tupleDescriptor,
|
tupleDescriptor,
|
||||||
newValues,
|
newValues,
|
||||||
newNulls,
|
newNulls,
|
||||||
update);
|
update);
|
||||||
|
|
||||||
#if PG_VERSION_NUM < PG_VERSION_18
|
systable_inplace_update_finish(state, tuple);
|
||||||
|
|
||||||
/*
|
StripeMetadata *modifiedStripeMetadata = BuildStripeMetadata(columnarStripes,
|
||||||
* heap_inplace_update already doesn't allow changing size of the original
|
tuple);
|
||||||
* tuple, so we don't allow setting any Datum's to NULL values.
|
|
||||||
*/
|
|
||||||
heap_inplace_update(columnarStripes, modifiedTuple);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Existing tuple now contains modifications, because we used
|
|
||||||
* heap_inplace_update().
|
|
||||||
*/
|
|
||||||
HeapTuple newTuple = oldTuple;
|
|
||||||
#else
|
|
||||||
|
|
||||||
/* Regular catalog UPDATE keeps indexes in sync */
|
|
||||||
CatalogTupleUpdate(columnarStripes, &oldTuple->t_self, modifiedTuple);
|
|
||||||
HeapTuple newTuple = modifiedTuple;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
/*
|
heap_freetuple(tuple);
|
||||||
* Must not pass modifiedTuple, because BuildStripeMetadata expects a real
|
table_close(columnarStripes, AccessShareLock);
|
||||||
* heap tuple with MVCC fields.
|
|
||||||
*/
|
|
||||||
StripeMetadata *modifiedStripeMetadata =
|
|
||||||
BuildStripeMetadata(columnarStripes, newTuple);
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
table_close(columnarStripes, openLockMode);
|
|
||||||
|
|
||||||
pfree(newValues);
|
pfree(newValues);
|
||||||
pfree(newNulls);
|
pfree(newNulls);
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
|
@ -144,6 +145,9 @@ static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
|
||||||
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
|
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
|
||||||
bool isDistributedQuery,
|
bool isDistributedQuery,
|
||||||
List *rangeTableList);
|
List *rangeTableList);
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
static int DisableSelfJoinElimination(void);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Distributed planner hook */
|
/* Distributed planner hook */
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
|
|
@ -155,6 +159,9 @@ distributed_planner(Query *parse,
|
||||||
bool needsDistributedPlanning = false;
|
bool needsDistributedPlanning = false;
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
FastPathRestrictionContext fastPathContext = { 0 };
|
FastPathRestrictionContext fastPathContext = { 0 };
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
int saveNestLevel = -1;
|
||||||
|
#endif
|
||||||
|
|
||||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||||
|
|
||||||
|
|
@ -218,6 +225,10 @@ distributed_planner(Query *parse,
|
||||||
bool setPartitionedTablesInherited = false;
|
bool setPartitionedTablesInherited = false;
|
||||||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||||
setPartitionedTablesInherited);
|
setPartitionedTablesInherited);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
saveNestLevel = DisableSelfJoinElimination();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -264,6 +275,13 @@ distributed_planner(Query *parse,
|
||||||
planContext.plan = standard_planner(planContext.query, NULL,
|
planContext.plan = standard_planner(planContext.query, NULL,
|
||||||
planContext.cursorOptions,
|
planContext.cursorOptions,
|
||||||
planContext.boundParams);
|
planContext.boundParams);
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
if (needsDistributedPlanning)
|
||||||
|
{
|
||||||
|
Assert(saveNestLevel > 0);
|
||||||
|
AtEOXact_GUC(true, saveNestLevel);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
|
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
|
||||||
needsDistributedPlanning,
|
needsDistributedPlanning,
|
||||||
rangeTableList);
|
rangeTableList);
|
||||||
|
|
@ -2791,3 +2809,27 @@ CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool
|
||||||
|
|
||||||
return isDistributedQuery;
|
return isDistributedQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DisableSelfJoinElimination is used to prevent self join elimination
|
||||||
|
* during distributed query planning to ensure shard queries are correctly
|
||||||
|
* generated. PG18's self join elimination (fc069a3a6) changes the Query
|
||||||
|
* in a way that can cause problems for queries with a mix of Citus and
|
||||||
|
* Postgres tables. Self join elimination is allowed on Postgres tables
|
||||||
|
* only so queries involving shards get the benefit of it.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
DisableSelfJoinElimination(void)
|
||||||
|
{
|
||||||
|
int NestLevel = NewGUCNestLevel();
|
||||||
|
set_config_option("enable_self_join_elimination", "off",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
return NestLevel;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -428,11 +428,10 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
|
||||||
ParamListInfo boundParams, bool hasUnresolvedParams,
|
ParamListInfo boundParams, bool hasUnresolvedParams,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
||||||
|
|
||||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||||
|
|
||||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||||
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
Query *selectQuery = selectRte->subquery;
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
|
||||||
bool allowRecursivePlanning = true;
|
bool allowRecursivePlanning = true;
|
||||||
|
|
@ -513,6 +512,24 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
|
||||||
|
|
||||||
bool isWrapped = false;
|
bool isWrapped = false;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PG18 is stricter about GroupRTE/GroupVar. For INSERT … SELECT with a GROUP BY,
|
||||||
|
* flatten the SELECT’s targetList and havingQual so Vars point to base RTEs and
|
||||||
|
* avoid Unrecognized range table id.
|
||||||
|
*/
|
||||||
|
if (selectRte->subquery->hasGroupRTE)
|
||||||
|
{
|
||||||
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
selectQuery->targetList = (List *)
|
||||||
|
flatten_group_exprs(NULL, selectQuery,
|
||||||
|
(Node *) selectQuery->targetList);
|
||||||
|
selectQuery->havingQual =
|
||||||
|
flatten_group_exprs(NULL, selectQuery, selectQuery->havingQual);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (selectRte->subquery->setOperations != NULL)
|
if (selectRte->subquery->setOperations != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
@ -1431,11 +1448,6 @@ static DistributedPlan *
|
||||||
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
|
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
Query *insertSelectQuery = copyObject(parse);
|
Query *insertSelectQuery = copyObject(parse);
|
||||||
|
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
|
||||||
Oid targetRelationId = insertRte->relid;
|
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||||
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
||||||
|
|
||||||
|
|
@ -1450,6 +1462,7 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
||||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||||
|
|
||||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||||
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
Query *selectQuery = selectRte->subquery;
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -1472,6 +1485,9 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
||||||
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, NULL, cursorOptions,
|
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, NULL, cursorOptions,
|
||||||
boundParams);
|
boundParams);
|
||||||
|
|
||||||
|
/* decide whether we can repartition the results */
|
||||||
|
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
||||||
|
Oid targetRelationId = insertRte->relid;
|
||||||
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
||||||
IsSupportedRedistributionTarget(targetRelationId);
|
IsSupportedRedistributionTarget(targetRelationId);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -73,34 +73,8 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation);
|
||||||
Datum
|
Datum
|
||||||
mark_tables_colocated(PG_FUNCTION_ARGS)
|
mark_tables_colocated(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
CheckCitusVersion(ERROR);
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
EnsureCoordinator();
|
errmsg("this function is deprecated and no longer is used")));
|
||||||
|
|
||||||
Oid sourceRelationId = PG_GETARG_OID(0);
|
|
||||||
ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
|
|
||||||
|
|
||||||
int relationCount = ArrayObjectCount(relationIdArrayObject);
|
|
||||||
if (relationCount < 1)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("at least one target table is required for this "
|
|
||||||
"operation")));
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureTableOwner(sourceRelationId);
|
|
||||||
|
|
||||||
Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);
|
|
||||||
|
|
||||||
for (int relationIndex = 0; relationIndex < relationCount; relationIndex++)
|
|
||||||
{
|
|
||||||
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
|
|
||||||
|
|
||||||
/* we require that the user either owns all tables or is superuser */
|
|
||||||
EnsureTableOwner(nextRelationOid);
|
|
||||||
|
|
||||||
MarkTablesColocated(sourceRelationId, nextRelationOid);
|
|
||||||
}
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1306,7 +1280,7 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex)
|
||||||
/*
|
/*
|
||||||
* DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there
|
* DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there
|
||||||
* is no relation in that co-location group. A co-location group may become empty after
|
* is no relation in that co-location group. A co-location group may become empty after
|
||||||
* mark_tables_colocated or upgrade_reference_table UDF calls. In that case we need to
|
* update_distributed_table_colocation UDF calls. In that case we need to
|
||||||
* remove empty co-location group to prevent orphaned co-location groups.
|
* remove empty co-location group to prevent orphaned co-location groups.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
|
|
||||||
|
|
@ -1200,7 +1200,7 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition
|
||||||
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
-- some tests for mark_tables_colocated
|
-- some tests for update_distributed_table_colocation
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
||||||
ERROR: relation reference_table_test should be a hash or single shard distributed table
|
ERROR: relation reference_table_test should be a hash or single shard distributed table
|
||||||
|
|
|
||||||
|
|
@ -165,10 +165,289 @@ ORDER BY contype;
|
||||||
dist_n_after_drop | n | 1
|
dist_n_after_drop | n | 1
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- cleanup
|
-- Purpose: test self join elimination for distributed, citus local and local tables.
|
||||||
RESET client_min_messages;
|
--
|
||||||
|
CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
|
||||||
|
CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
|
||||||
|
CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text);
|
||||||
|
SELECT create_distributed_table('sje_d1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('sje_d2', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i;
|
||||||
|
INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i;
|
||||||
|
INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i;
|
||||||
|
-- Self-join elimination is applied when distributed tables are involved
|
||||||
|
-- The query plan has only one join
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_d2 u1 USING (id) INNER
|
||||||
|
JOIN sje_d2 u2 USING (id) INNER
|
||||||
|
JOIN sje_d2 u3 USING (id) INNER
|
||||||
|
JOIN sje_d2 u4 USING (id) INNER
|
||||||
|
JOIN sje_d2 u5 USING (id) INNER
|
||||||
|
JOIN sje_d2 u6 USING (id);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (sje_d1.id = u6.id)
|
||||||
|
-> Seq Scan on sje_d1_102012 sje_d1
|
||||||
|
-> Hash
|
||||||
|
-> Seq Scan on sje_d2_102016 u6
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_d2 u1 USING (id) INNER
|
||||||
|
JOIN sje_d2 u2 USING (id) INNER
|
||||||
|
JOIN sje_d2 u3 USING (id) INNER
|
||||||
|
JOIN sje_d2 u4 USING (id) INNER
|
||||||
|
JOIN sje_d2 u5 USING (id) INNER
|
||||||
|
JOIN sje_d2 u6 USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Self-join elimination applied to from list join
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
|
||||||
|
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (d1.id = u3.id)
|
||||||
|
-> Seq Scan on sje_d1_102012 d1
|
||||||
|
-> Hash
|
||||||
|
-> Seq Scan on sje_d2_102016 u3
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
|
||||||
|
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Self-join elimination is not applied when a local table is involved
|
||||||
|
-- This is a limitation that will be resolved in citus 14
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_local u1 USING (id) INNER
|
||||||
|
JOIN sje_local u2 USING (id) INNER
|
||||||
|
JOIN sje_local u3 USING (id) INNER
|
||||||
|
JOIN sje_local u4 USING (id) INNER
|
||||||
|
JOIN sje_local u5 USING (id) INNER
|
||||||
|
JOIN sje_local u6 USING (id);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
-> Distributed Subplan XXX_1
|
||||||
|
-> Seq Scan on sje_local u1
|
||||||
|
-> Distributed Subplan XXX_2
|
||||||
|
-> Seq Scan on sje_local u2
|
||||||
|
-> Distributed Subplan XXX_3
|
||||||
|
-> Seq Scan on sje_local u3
|
||||||
|
-> Distributed Subplan XXX_4
|
||||||
|
-> Seq Scan on sje_local u4
|
||||||
|
-> Distributed Subplan XXX_5
|
||||||
|
-> Seq Scan on sje_local u5
|
||||||
|
-> Distributed Subplan XXX_6
|
||||||
|
-> Seq Scan on sje_local u6
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result_5.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result_5
|
||||||
|
-> Hash
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result_4.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result_4
|
||||||
|
-> Hash
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result_3.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result_3
|
||||||
|
-> Hash
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result_2.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result_2
|
||||||
|
-> Hash
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result_1.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result_1
|
||||||
|
-> Hash
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (intermediate_result.id = sje_d1.id)
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result
|
||||||
|
-> Hash
|
||||||
|
-> Seq Scan on sje_d1_102012 sje_d1
|
||||||
|
(44 rows)
|
||||||
|
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_local u1 USING (id) INNER
|
||||||
|
JOIN sje_local u2 USING (id) INNER
|
||||||
|
JOIN sje_local u3 USING (id) INNER
|
||||||
|
JOIN sje_local u4 USING (id) INNER
|
||||||
|
JOIN sje_local u5 USING (id) INNER
|
||||||
|
JOIN sje_local u6 USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- to test USING vs ON equivalence
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 ON (d.id = u1.id)
|
||||||
|
JOIN sje_d2 u2 ON (u1.id = u2.id);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (d.id = u2.id)
|
||||||
|
-> Seq Scan on sje_d1_102012 d
|
||||||
|
-> Hash
|
||||||
|
-> Seq Scan on sje_d2_102016 u2
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 ON (d.id = u1.id)
|
||||||
|
JOIN sje_d2 u2 ON (u1.id = u2.id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Null-introducing join can have SJE
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM sje_d1 d
|
||||||
|
LEFT JOIN sje_d2 u1 USING (id)
|
||||||
|
LEFT JOIN sje_d2 u2 USING (id);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Seq Scan on sje_d1_102012 d
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM sje_d1 d
|
||||||
|
LEFT JOIN sje_d2 u1 USING (id)
|
||||||
|
LEFT JOIN sje_d2 u2 USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- prepared statement
|
||||||
|
PREPARE sje_p(int,int) AS
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id)
|
||||||
|
WHERE d.id BETWEEN $1 AND $2;
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
EXECUTE sje_p(10,20);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (u2.id = d.id)
|
||||||
|
-> Seq Scan on sje_d2_102016 u2
|
||||||
|
-> Hash
|
||||||
|
-> Bitmap Heap Scan on sje_d1_102012 d
|
||||||
|
Recheck Cond: ((id >= 10) AND (id <= 20))
|
||||||
|
-> Bitmap Index Scan on sje_d1_pkey_102012
|
||||||
|
Index Cond: ((id >= 10) AND (id <= 20))
|
||||||
|
(15 rows)
|
||||||
|
|
||||||
|
EXECUTE sje_p(10,20);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
11
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cte
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN z USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id);
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
-> Hash Join
|
||||||
|
Hash Cond: (d.id = u2.id)
|
||||||
|
-> Seq Scan on sje_d1_102012 d
|
||||||
|
-> Hash
|
||||||
|
-> Seq Scan on sje_d2_102016 u2
|
||||||
|
Filter: ((id % '2'::bigint) = 0)
|
||||||
|
(13 rows)
|
||||||
|
|
||||||
|
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN z USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
51
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cleanup with minimum verbosity
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP SCHEMA pg18_nn CASCADE;
|
DROP SCHEMA pg18_nn CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
RESET client_min_messages;
|
||||||
DETAIL: drop cascades to table pg18_nn.nn_local
|
|
||||||
drop cascades to table pg18_nn.nn_dist
|
|
||||||
|
|
|
||||||
|
|
@ -768,7 +768,7 @@ WHERE
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
-- some tests for mark_tables_colocated
|
-- some tests for update_distributed_table_colocation
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -134,7 +134,119 @@ WHERE conrelid = 'pg18_nn.nn_dist'::regclass
|
||||||
GROUP BY contype
|
GROUP BY contype
|
||||||
ORDER BY contype;
|
ORDER BY contype;
|
||||||
|
|
||||||
-- cleanup
|
-- Purpose: test self join elimination for distributed, citus local and local tables.
|
||||||
RESET client_min_messages;
|
--
|
||||||
|
CREATE TABLE sje_d1 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
|
||||||
|
CREATE TABLE sje_d2 (id bigserial PRIMARY KEY, name text, created_at timestamptz DEFAULT now());
|
||||||
|
CREATE TABLE sje_local (id bigserial PRIMARY KEY, title text);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('sje_d1', 'id');
|
||||||
|
SELECT create_distributed_table('sje_d2', 'id');
|
||||||
|
|
||||||
|
INSERT INTO sje_d1 SELECT i, i::text, now() FROM generate_series(0,100)i;
|
||||||
|
INSERT INTO sje_d2 SELECT i, i::text, now() FROM generate_series(0,100)i;
|
||||||
|
INSERT INTO sje_local SELECT i, i::text FROM generate_series(0,100)i;
|
||||||
|
|
||||||
|
-- Self-join elimination is applied when distributed tables are involved
|
||||||
|
-- The query plan has only one join
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_d2 u1 USING (id) INNER
|
||||||
|
JOIN sje_d2 u2 USING (id) INNER
|
||||||
|
JOIN sje_d2 u3 USING (id) INNER
|
||||||
|
JOIN sje_d2 u4 USING (id) INNER
|
||||||
|
JOIN sje_d2 u5 USING (id) INNER
|
||||||
|
JOIN sje_d2 u6 USING (id);
|
||||||
|
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_d2 u1 USING (id) INNER
|
||||||
|
JOIN sje_d2 u2 USING (id) INNER
|
||||||
|
JOIN sje_d2 u3 USING (id) INNER
|
||||||
|
JOIN sje_d2 u4 USING (id) INNER
|
||||||
|
JOIN sje_d2 u5 USING (id) INNER
|
||||||
|
JOIN sje_d2 u6 USING (id);
|
||||||
|
|
||||||
|
-- Self-join elimination applied to from list join
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
|
||||||
|
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
|
||||||
|
|
||||||
|
SELECT count(1) from sje_d1 d1, sje_d2 u1, sje_d2 u2, sje_d2 u3
|
||||||
|
WHERE d1.id = u1.id and u1.id = u2.id and u3.id = d1.id;
|
||||||
|
|
||||||
|
-- Self-join elimination is not applied when a local table is involved
|
||||||
|
-- This is a limitation that will be resolved in citus 14
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_local u1 USING (id) INNER
|
||||||
|
JOIN sje_local u2 USING (id) INNER
|
||||||
|
JOIN sje_local u3 USING (id) INNER
|
||||||
|
JOIN sje_local u4 USING (id) INNER
|
||||||
|
JOIN sje_local u5 USING (id) INNER
|
||||||
|
JOIN sje_local u6 USING (id);
|
||||||
|
|
||||||
|
select count(1) from sje_d1 INNER
|
||||||
|
JOIN sje_local u1 USING (id) INNER
|
||||||
|
JOIN sje_local u2 USING (id) INNER
|
||||||
|
JOIN sje_local u3 USING (id) INNER
|
||||||
|
JOIN sje_local u4 USING (id) INNER
|
||||||
|
JOIN sje_local u5 USING (id) INNER
|
||||||
|
JOIN sje_local u6 USING (id);
|
||||||
|
|
||||||
|
|
||||||
|
-- to test USING vs ON equivalence
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 ON (d.id = u1.id)
|
||||||
|
JOIN sje_d2 u2 ON (u1.id = u2.id);
|
||||||
|
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 ON (d.id = u1.id)
|
||||||
|
JOIN sje_d2 u2 ON (u1.id = u2.id);
|
||||||
|
|
||||||
|
-- Null-introducing join can have SJE
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM sje_d1 d
|
||||||
|
LEFT JOIN sje_d2 u1 USING (id)
|
||||||
|
LEFT JOIN sje_d2 u2 USING (id);
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM sje_d1 d
|
||||||
|
LEFT JOIN sje_d2 u1 USING (id)
|
||||||
|
LEFT JOIN sje_d2 u2 USING (id);
|
||||||
|
|
||||||
|
-- prepared statement
|
||||||
|
PREPARE sje_p(int,int) AS
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN sje_d2 u1 USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id)
|
||||||
|
WHERE d.id BETWEEN $1 AND $2;
|
||||||
|
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
EXECUTE sje_p(10,20);
|
||||||
|
|
||||||
|
EXECUTE sje_p(10,20);
|
||||||
|
|
||||||
|
-- cte
|
||||||
|
EXPLAIN (costs off)
|
||||||
|
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN z USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id);
|
||||||
|
|
||||||
|
WITH z AS (SELECT id FROM sje_d2 WHERE id % 2 = 0)
|
||||||
|
SELECT count(1)
|
||||||
|
FROM sje_d1 d
|
||||||
|
JOIN z USING (id)
|
||||||
|
JOIN sje_d2 u2 USING (id);
|
||||||
|
|
||||||
|
-- cleanup with minimum verbosity
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP SCHEMA pg18_nn CASCADE;
|
DROP SCHEMA pg18_nn CASCADE;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue