Refactor insert select planner for improved identity column handling and enhance regression tests

m3hm3t/issue_7887
Mehmet Yilmaz 2025-03-05 07:14:40 +00:00
parent 789bfbbf54
commit c865d6323c
2 changed files with 78 additions and 72 deletions

View File

@ -1071,8 +1071,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
TargetEntry *newSubqueryTargetEntry = NULL; TargetEntry *newSubqueryTargetEntry = NULL;
AttrNumber originalAttrNo = get_attnum(insertRelationId, AttrNumber originalAttrNo = get_attnum(insertRelationId,
oldInsertTargetEntry->resname); oldInsertTargetEntry->resname);
/* see transformInsertRow() for the details */ /* see transformInsertRow() for the details */
if (IsA(oldInsertTargetEntry->expr, SubscriptingRef) || if (IsA(oldInsertTargetEntry->expr, SubscriptingRef) ||
@ -1112,19 +1111,21 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
else else
{ {
/* /*
* Check if the target column is an identity column and whether the query did NOT * Check if the target column is an identity column and whether the query did NOT
* specify OVERRIDING SYSTEM VALUE. If both conditions are true, we need to consider * specify OVERRIDING SYSTEM VALUE. If both conditions are true, we need to consider
* generating a default sequence value. * generating a default sequence value.
*/ */
if (IsIdentityColumn(insertRelationId, oldInsertTargetEntry->resname) && originalQuery->override != OVERRIDING_SYSTEM_VALUE) if (IsIdentityColumn(insertRelationId, oldInsertTargetEntry->resname) &&
originalQuery->override != OVERRIDING_SYSTEM_VALUE)
{ {
/* /*
* Open the target relation (table) with an AccessShareLock to safely access metadata, * Open the target relation (table) with an AccessShareLock to safely access metadata,
* such as the identity sequence. * such as the identity sequence.
*/ */
Relation targetRel = table_open(insertRelationId, AccessShareLock); Relation targetRel = table_open(insertRelationId, AccessShareLock);
AttrNumber attrNum = get_attnum(insertRelationId, oldInsertTargetEntry->resname); AttrNumber attrNum = get_attnum(insertRelationId,
oldInsertTargetEntry->resname);
bool missingOk = false; bool missingOk = false;
Oid seqOid = getIdentitySequence(targetRel, attrNum, missingOk); Oid seqOid = getIdentitySequence(targetRel, attrNum, missingOk);
@ -1137,23 +1138,23 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
/* Build an expression tree that represents: nextval('sequence_oid'::regclass) */ /* Build an expression tree that represents: nextval('sequence_oid'::regclass) */
Expr *defaultExpr = MakeNextValExprForIdentity(seqOid); Expr *defaultExpr = MakeNextValExprForIdentity(seqOid);
/* Create a new target entry that uses the default expression to generate the next sequence value */ /* Create a new target entry that uses the default expression to generate the next sequence value */
newSubqueryTargetEntry = makeTargetEntry( newSubqueryTargetEntry = makeTargetEntry(
defaultExpr, defaultExpr,
resno, resno,
oldInsertTargetEntry->resname, oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk oldInsertTargetEntry->resjunk
); );
table_close(targetRel, AccessShareLock); table_close(targetRel, AccessShareLock);
} }
else else
{ {
/* /*
* For non-identity columns, or if the query used OVERRIDING SYSTEM VALUE, * For non-identity columns, or if the query used OVERRIDING SYSTEM VALUE,
* use the provided expression without modification. * use the provided expression without modification.
*/ */
newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr, newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
resno, resno,
oldInsertTargetEntry->resname, oldInsertTargetEntry->resname,
@ -1233,36 +1234,35 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
Expr * Expr *
MakeNextValExprForIdentity(Oid seq_relid) MakeNextValExprForIdentity(Oid seq_relid)
{ {
Const *seq_const; List *func_args;
List *func_args; Oid nextval_oid;
Oid nextval_oid;
seq_const = makeConst( Const *seq_const = makeConst(
REGCLASSOID, /* type for regclass */ REGCLASSOID, /* type for regclass */
-1, /* no specific collation */ -1, /* no specific collation */
InvalidOid, /* default collation */ InvalidOid, /* default collation */
sizeof(Oid), /* size of the Oid */ sizeof(Oid), /* size of the Oid */
ObjectIdGetDatum(seq_relid), ObjectIdGetDatum(seq_relid),
false, /* not null */ false, /* not null */
true /* pass by value */ true /* pass by value */
); );
func_args = list_make1(seq_const); func_args = list_make1(seq_const);
nextval_oid = LookupFuncName( nextval_oid = LookupFuncName(
list_make1(makeString("nextval")), list_make1(makeString("nextval")),
1, 1,
(Oid[]){ REGCLASSOID }, (Oid[]) { REGCLASSOID },
false false
); );
return (Expr *) makeFuncExpr( return (Expr *) makeFuncExpr(
nextval_oid, /* OID of nextval() */ nextval_oid, /* OID of nextval() */
INT8OID, /* nextval returns int8 */ INT8OID, /* nextval returns int8 */
func_args, /* arguments for nextval() */ func_args, /* arguments for nextval() */
InvalidOid, /* no specific collation */ InvalidOid, /* no specific collation */
InvalidOid, InvalidOid,
COERCE_EXPLICIT_CALL COERCE_EXPLICIT_CALL
); );
} }
@ -1272,36 +1272,42 @@ MakeNextValExprForIdentity(Oid seq_relid)
bool bool
IsIdentityColumn(Oid relid, const char *colName) IsIdentityColumn(Oid relid, const char *colName)
{ {
/* Check if colName is non-null (optional, if colName can be NULL) */ /* Check if colName is non-null (optional, if colName can be NULL) */
if (colName == NULL) if (colName == NULL)
return false; {
return false;
}
/* Get the attribute number for the given column name */ /* Get the attribute number for the given column name */
AttrNumber attrNum = get_attnum(relid, colName); AttrNumber attrNum = get_attnum(relid, colName);
if (attrNum == InvalidAttrNumber) if (attrNum == InvalidAttrNumber)
return false; {
return false;
}
/* Open the relation to access its metadata */ /* Open the relation to access its metadata */
Relation rel = RelationIdGetRelation(relid); Relation rel = RelationIdGetRelation(relid);
if (!RelationIsValid(rel)) if (!RelationIsValid(rel))
return false; {
return false;
}
/* Ensure the attribute number is within the valid range */ /* Ensure the attribute number is within the valid range */
if (attrNum <= 0 || attrNum > rel->rd_att->natts) if (attrNum <= 0 || attrNum > rel->rd_att->natts)
{ {
RelationClose(rel); RelationClose(rel);
return false; return false;
} }
/* Fetch the attribute metadata (attributes are 0-indexed) */ /* Fetch the attribute metadata (attributes are 0-indexed) */
Form_pg_attribute attr = TupleDescAttr(rel->rd_att, attrNum - 1); Form_pg_attribute attr = TupleDescAttr(rel->rd_att, attrNum - 1);
/* Determine if the column is defined as an identity column */ /* Determine if the column is defined as an identity column */
bool is_identity = (attr->attidentity == ATTRIBUTE_IDENTITY_ALWAYS || bool is_identity = (attr->attidentity == ATTRIBUTE_IDENTITY_ALWAYS ||
attr->attidentity == ATTRIBUTE_IDENTITY_BY_DEFAULT); attr->attidentity == ATTRIBUTE_IDENTITY_BY_DEFAULT);
RelationClose(rel); RelationClose(rel);
return is_identity; return is_identity;
} }

View File

@ -35,7 +35,7 @@ INSERT INTO local2(local1fk, reference1fk)
SELECT * FROM local2; SELECT * FROM local2;
-- We do a "INSERT INTO local2(id, local1fk, reference1fk) SELECT 9999, id, 2" which -- We do a "INSERT INTO local2(id, local1fk, reference1fk) SELECT 9999, id, 2" which
-- should fail under normal PG rules if no OVERRIDING clause is used. -- should fail under normal PG rules if no OVERRIDING clause is used.
INSERT INTO local2(id, local1fk, reference1fk) INSERT INTO local2(id, local1fk, reference1fk)
@ -66,7 +66,7 @@ SELECT * FROM local2_bydefault;
-- --
-- Overriding a BY DEFAULT identity with user value -- Overriding a BY DEFAULT identity with user value
-- (which is allowed even without OVERRIDING clause). -- (which is allowed even without OVERRIDING clause).
-- --
-- Provide explicit id for BY DEFAULT identity => no special OVERRIDING needed -- Provide explicit id for BY DEFAULT identity => no special OVERRIDING needed
@ -89,4 +89,4 @@ UNION ALL
SELECT 'local2_bydefault', * FROM local2_bydefault SELECT 'local2_bydefault', * FROM local2_bydefault
ORDER BY table_name, id; ORDER BY table_name, id;
DROP SCHEMA issue_7887 CASCADE; DROP SCHEMA issue_7887 CASCADE;