mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into fix-collation-mismatch-create-dist-table
commit
7f106c8ff0
|
|
@ -428,11 +428,10 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
|
||||||
ParamListInfo boundParams, bool hasUnresolvedParams,
|
ParamListInfo boundParams, bool hasUnresolvedParams,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
||||||
|
|
||||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||||
|
|
||||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||||
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
Query *selectQuery = selectRte->subquery;
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
|
||||||
bool allowRecursivePlanning = true;
|
bool allowRecursivePlanning = true;
|
||||||
|
|
@ -513,6 +512,24 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
|
||||||
|
|
||||||
bool isWrapped = false;
|
bool isWrapped = false;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PG18 is stricter about GroupRTE/GroupVar. For INSERT … SELECT with a GROUP BY,
|
||||||
|
* flatten the SELECT’s targetList and havingQual so Vars point to base RTEs and
|
||||||
|
* avoid Unrecognized range table id.
|
||||||
|
*/
|
||||||
|
if (selectRte->subquery->hasGroupRTE)
|
||||||
|
{
|
||||||
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
selectQuery->targetList = (List *)
|
||||||
|
flatten_group_exprs(NULL, selectQuery,
|
||||||
|
(Node *) selectQuery->targetList);
|
||||||
|
selectQuery->havingQual =
|
||||||
|
flatten_group_exprs(NULL, selectQuery, selectQuery->havingQual);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (selectRte->subquery->setOperations != NULL)
|
if (selectRte->subquery->setOperations != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
@ -1431,11 +1448,6 @@ static DistributedPlan *
|
||||||
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
|
CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
Query *insertSelectQuery = copyObject(parse);
|
Query *insertSelectQuery = copyObject(parse);
|
||||||
|
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
|
||||||
Oid targetRelationId = insertRte->relid;
|
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||||
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
|
||||||
|
|
||||||
|
|
@ -1450,6 +1462,7 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
||||||
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
PrepareInsertSelectForCitusPlanner(insertSelectQuery);
|
||||||
|
|
||||||
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
|
||||||
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
|
||||||
Query *selectQuery = selectRte->subquery;
|
Query *selectQuery = selectRte->subquery;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -1472,6 +1485,9 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
|
||||||
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, NULL, cursorOptions,
|
PlannedStmt *selectPlan = pg_plan_query(selectQueryCopy, NULL, cursorOptions,
|
||||||
boundParams);
|
boundParams);
|
||||||
|
|
||||||
|
/* decide whether we can repartition the results */
|
||||||
|
RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
|
||||||
|
Oid targetRelationId = insertRte->relid;
|
||||||
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
|
||||||
IsSupportedRedistributionTarget(targetRelationId);
|
IsSupportedRedistributionTarget(targetRelationId);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -73,34 +73,8 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation);
|
||||||
Datum
|
Datum
|
||||||
mark_tables_colocated(PG_FUNCTION_ARGS)
|
mark_tables_colocated(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
CheckCitusVersion(ERROR);
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
EnsureCoordinator();
|
errmsg("this function is deprecated and no longer is used")));
|
||||||
|
|
||||||
Oid sourceRelationId = PG_GETARG_OID(0);
|
|
||||||
ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
|
|
||||||
|
|
||||||
int relationCount = ArrayObjectCount(relationIdArrayObject);
|
|
||||||
if (relationCount < 1)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("at least one target table is required for this "
|
|
||||||
"operation")));
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureTableOwner(sourceRelationId);
|
|
||||||
|
|
||||||
Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);
|
|
||||||
|
|
||||||
for (int relationIndex = 0; relationIndex < relationCount; relationIndex++)
|
|
||||||
{
|
|
||||||
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
|
|
||||||
|
|
||||||
/* we require that the user either owns all tables or is superuser */
|
|
||||||
EnsureTableOwner(nextRelationOid);
|
|
||||||
|
|
||||||
MarkTablesColocated(sourceRelationId, nextRelationOid);
|
|
||||||
}
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1306,7 +1280,7 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex)
|
||||||
/*
|
/*
|
||||||
* DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there
|
* DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there
|
||||||
* is no relation in that co-location group. A co-location group may become empty after
|
* is no relation in that co-location group. A co-location group may become empty after
|
||||||
* mark_tables_colocated or upgrade_reference_table UDF calls. In that case we need to
|
* update_distributed_table_colocation UDF calls. In that case we need to
|
||||||
* remove empty co-location group to prevent orphaned co-location groups.
|
* remove empty co-location group to prevent orphaned co-location groups.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
|
|
||||||
|
|
@ -1200,7 +1200,7 @@ DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition
|
||||||
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
-- some tests for mark_tables_colocated
|
-- some tests for update_distributed_table_colocation
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
||||||
ERROR: relation reference_table_test should be a hash or single shard distributed table
|
ERROR: relation reference_table_test should be a hash or single shard distributed table
|
||||||
|
|
|
||||||
|
|
@ -768,7 +768,7 @@ WHERE
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
-- some tests for mark_tables_colocated
|
-- some tests for update_distributed_table_colocation
|
||||||
-- should error out
|
-- should error out
|
||||||
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue