Add identity column handling functions and enhance regression test for issue 7887

pull/7920/head
Mehmet Yilmaz 2025-03-04 16:22:47 +00:00
parent 4a2bc115dd
commit 56547f72d1
4 changed files with 316 additions and 83 deletions

View File

@ -1071,6 +1071,8 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
TargetEntry *newSubqueryTargetEntry = NULL;
AttrNumber originalAttrNo = get_attnum(insertRelationId,
oldInsertTargetEntry->resname);
/* see transformInsertRow() for the details */
if (IsA(oldInsertTargetEntry->expr, SubscriptingRef) ||
@ -1109,14 +1111,61 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
}
else
{
newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
resno,
oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk);
/*
* Check if the target column is an identity column and whether the query did NOT
* specify OVERRIDING SYSTEM VALUE. If both conditions are true, we need to consider
* generating a default sequence value.
*/
if (IsIdentityColumn(insertRelationId, oldInsertTargetEntry->resname) && originalQuery->override != OVERRIDING_SYSTEM_VALUE)
{
/*
* Open the target relation (table) with an AccessShareLock to safely access metadata,
* such as the identity sequence.
*/
Relation targetRel = table_open(insertRelationId, AccessShareLock);
AttrNumber attrNum = get_attnum(insertRelationId, oldInsertTargetEntry->resname);
bool missingOk = false;
Oid seqOid = getIdentitySequence(targetRel, attrNum, missingOk);
if (!OidIsValid(seqOid))
{
table_close(targetRel, AccessShareLock);
elog(ERROR, "could not find identity sequence for relation %u col %s",
insertRelationId, oldInsertTargetEntry->resname);
}
/* Build an expression tree that represents: nextval('sequence_oid'::regclass) */
Expr *defaultExpr = MakeNextValExprForIdentity(seqOid);
/* Create a new target entry that uses the default expression to generate the next sequence value */
newSubqueryTargetEntry = makeTargetEntry(
defaultExpr,
resno,
oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk
);
table_close(targetRel, AccessShareLock);
}
else
{
/*
* For non-identity columns, or if the query used OVERRIDING SYSTEM VALUE,
* use the provided expression without modification.
*/
newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
resno,
oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk);
}
/* Append the new target entry to the subquery's target list */
newSubqueryTargetlist = lappend(newSubqueryTargetlist,
newSubqueryTargetEntry);
}
String *columnName = makeString(newSubqueryTargetEntry->resname);
columnNameList = lappend(columnNameList, columnName);
@ -1177,6 +1226,85 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
}
/*
* MakeNextValExprForIdentity creates an expression that generates the next value
* from the specified sequence, which is used for identity columns.
*/
Expr *
MakeNextValExprForIdentity(Oid seq_relid)
{
Const *seq_const;
List *func_args;
Oid nextval_oid;
seq_const = makeConst(
REGCLASSOID, /* type for regclass */
-1, /* no specific collation */
InvalidOid, /* default collation */
sizeof(Oid), /* size of the Oid */
ObjectIdGetDatum(seq_relid),
false, /* not null */
true /* pass by value */
);
func_args = list_make1(seq_const);
nextval_oid = LookupFuncName(
list_make1(makeString("nextval")),
1,
(Oid[]){ REGCLASSOID },
false
);
return (Expr *) makeFuncExpr(
nextval_oid, /* OID of nextval() */
INT8OID, /* nextval returns int8 */
func_args, /* arguments for nextval() */
InvalidOid, /* no specific collation */
InvalidOid,
COERCE_EXPLICIT_CALL
);
}
/*
* Checks whether a given column in the specified relation is an identity column.
*/
bool
IsIdentityColumn(Oid relid, const char *colName)
{
/* Check if colName is non-null (optional, if colName can be NULL) */
if (colName == NULL)
return false;
/* Get the attribute number for the given column name */
AttrNumber attrNum = get_attnum(relid, colName);
if (attrNum == InvalidAttrNumber)
return false;
/* Open the relation to access its metadata */
Relation rel = RelationIdGetRelation(relid);
if (!RelationIsValid(rel))
return false;
/* Ensure the attribute number is within the valid range */
if (attrNum <= 0 || attrNum > rel->rd_att->natts)
{
RelationClose(rel);
return false;
}
/* Fetch the attribute metadata (attributes are 0-indexed) */
Form_pg_attribute attr = TupleDescAttr(rel->rd_att, attrNum - 1);
/* Determine if the column is defined as an identity column */
bool is_identity = (attr->attidentity == ATTRIBUTE_IDENTITY_ALWAYS ||
attr->attidentity == ATTRIBUTE_IDENTITY_BY_DEFAULT);
RelationClose(rel);
return is_identity;
}
/*
* InsertPartitionColumnMatchesSelect returns NULL the partition column in the
* table targeted by INSERTed matches with the any of the SELECTed table's
@ -1438,80 +1566,6 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
/*
* 1) Open the target relation to inspect its attributes and detect identity columns.
*/
Relation targetRel = RelationIdGetRelation(targetRelationId);
if (RelationIsValid(targetRel))
{
ListCell *lcInsert = NULL;
// ListCell *lcSelect = list_head(selectQuery->targetList);
/* We'll build new lists for both sides */
List *newInsertTList = NIL;
List *newSelectTList = NIL;
int insertIndex = 0;
foreach(lcInsert, insertSelectQuery->targetList)
{
TargetEntry *insertTle = (TargetEntry *) lfirst(lcInsert);
insertIndex++;
/* Get the corresponding TLE from the SELECT by position or resno */
TargetEntry *selectTle = NULL;
/*
* If your plan is guaranteed to keep them in the same order, you can
* do "selectTle = (TargetEntry *) list_nth(selectQuery->targetList, insertIndex - 1)".
*
* Alternatively, if you rely on resno alignment, you'd find the TLE with resno==insertTle->resno.
* For simplicity, let's assume same ordering:
*/
selectTle = (TargetEntry *) list_nth(selectQuery->targetList, insertIndex - 1);
/*
* Check if the insertTle is an identity column that the user didn't supply,
* e.g. by checking 'attr->attidentity == ATTRIBUTE_IDENTITY_ALWAYS' etc.
* If we skip it, also skip the SELECT TLE at the same position.
*/
int attno = insertTle->resno;
if (attno > 0 && attno <= targetRel->rd_att->natts)
{
Form_pg_attribute attr = TupleDescAttr(targetRel->rd_att, attno - 1);
/*
* If 'attr->attidentity' is 'a' or 'd' => It's an identity column.
* If the user hasn't explicitly specified a value (which is presumably
* indicated by something in the parse tree?), we remove or convert
* the TLE to a default.
*/
// bool userSpecifiedValue = CheckIfUserSpecifiedValue(tle, parse);
bool userSpecifiedValue = false;
if ((attr->attidentity == ATTRIBUTE_IDENTITY_ALWAYS ||
attr->attidentity == ATTRIBUTE_IDENTITY_BY_DEFAULT) &&
!userSpecifiedValue)
{
/* Skip adding TLE => effectively uses default identity generation */
continue;
}
}
/* else keep both TLEs */
newInsertTList = lappend(newInsertTList, insertTle);
newSelectTList = lappend(newSelectTList, selectTle);
}
/* Now we have 1:1 matching lists with the identity column removed from both sides */
insertSelectQuery->targetList = newInsertTList;
selectQuery->targetList = newSelectTList;
RelationClose(targetRel);
}
/*
* Later we might need to call WrapTaskListForProjection(), which requires
* that select target list has unique names, otherwise the outer query

View File

@ -46,6 +46,8 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
extern char * InsertSelectResultIdPrefix(uint64 planId);
extern bool PlanningInsertSelect(void);
extern Query * WrapSubquery(Query *subquery);
extern bool IsIdentityColumn(Oid relid, const char *colName);
extern Expr * MakeNextValExprForIdentity(Oid seq_relid);
#endif /* INSERT_SELECT_PLANNER_H */

View File

@ -0,0 +1,118 @@
CREATE SCHEMA issue_7887;
SET search_path to 'issue_7887';
CREATE TABLE local1 (
id text not null primary key
);
CREATE TABLE reference1 (
id int not null primary key,
reference_col1 text not null
);
SELECT create_reference_table('reference1');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE local2 (
id int not null generated always as identity,
local1fk text not null,
reference1fk int not null,
constraint loc1fk foreign key (local1fk) references local1(id),
constraint reference1fk foreign key (reference1fk) references reference1(id),
constraint testlocpk primary key (id)
);
INSERT INTO local1(id) VALUES ('aaaaa'), ('bbbbb'), ('ccccc');
INSERT INTO reference1(id, reference_col1) VALUES (1, 'test'), (2, 'test2'), (3, 'test3');
--
-- Partial insert: omit the identity column
-- This triggers the known bug in older code paths if not fixed.
--
INSERT INTO local2(local1fk, reference1fk)
SELECT id, 1
FROM local1;
-- Check inserted rows in local2
SELECT * FROM local2;
id | local1fk | reference1fk
---------------------------------------------------------------------
1 | aaaaa | 1
2 | bbbbb | 1
3 | ccccc | 1
(3 rows)
-- We do a "INSERT INTO local2(id, local1fk, reference1fk) SELECT 9999, id, 2" which
-- should fail under normal PG rules if no OVERRIDING clause is used.
INSERT INTO local2(id, local1fk, reference1fk)
SELECT 9999, id, 2 FROM local1 LIMIT 1;
ERROR: cannot insert a non-DEFAULT value into column "id"
DETAIL: Column "id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
-- Using OVERRIDING SYSTEM VALUE to override ALWAYS identity
INSERT INTO local2(id, local1fk, reference1fk)
OVERRIDING SYSTEM VALUE
SELECT 9999, id, 2 FROM local1 LIMIT 1;
-- Create a second table with BY DEFAULT identity to test different identity mode
CREATE TABLE local2_bydefault (
id int NOT NULL GENERATED BY DEFAULT AS IDENTITY,
local1fk text NOT NULL,
reference1fk int NOT NULL,
CONSTRAINT loc1fk_bd FOREIGN KEY (local1fk) REFERENCES local1(id),
CONSTRAINT reference1fk_bd FOREIGN KEY (reference1fk) REFERENCES reference1(id),
CONSTRAINT testlocpk_bd PRIMARY KEY (id)
);
INSERT INTO local1(id) VALUES ('xxxxx'), ('yyyyy'), ('ddddd'), ('zzzzz');
INSERT INTO local2_bydefault(local1fk, reference1fk)
SELECT 'xxxxx', 1;
-- Show inserted row in local2_bydefault
SELECT * FROM local2_bydefault;
id | local1fk | reference1fk
---------------------------------------------------------------------
1 | xxxxx | 1
(1 row)
--
-- Overriding a BY DEFAULT identity with user value
-- (which is allowed even without OVERRIDING clause).
--
-- Provide explicit id for BY DEFAULT identity => no special OVERRIDING needed
INSERT INTO local2_bydefault(id, local1fk, reference1fk)
VALUES (5000, 'yyyyy', 2);
-- Show rows (we expect id=5000 and one with auto-generated ID)
SELECT * FROM local2_bydefault ORDER BY id;
id | local1fk | reference1fk
---------------------------------------------------------------------
1 | xxxxx | 1
5000 | yyyyy | 2
(2 rows)
-- Insert referencing reference1fk=3 => partial insert on both tables
INSERT INTO local2(local1fk, reference1fk)
VALUES ('ddddd', 3);
INSERT INTO local2_bydefault(local1fk, reference1fk)
SELECT 'zzzzz', 3;
-- Show final state of local2 and local2_bydefault
SELECT 'local2' as table_name, * FROM local2
UNION ALL
SELECT 'local2_bydefault', * FROM local2_bydefault
ORDER BY table_name, id;
table_name | id | local1fk | reference1fk
---------------------------------------------------------------------
local2 | 1 | aaaaa | 1
local2 | 2 | bbbbb | 1
local2 | 3 | ccccc | 1
local2 | 4 | ddddd | 3
local2 | 9999 | aaaaa | 2
local2_bydefault | 1 | xxxxx | 1
local2_bydefault | 2 | zzzzz | 3
local2_bydefault | 5000 | yyyyy | 2
(8 rows)
DROP SCHEMA issue_7887 CASCADE;
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table local1_102010
drop cascades to table reference1
drop cascades to table reference1_102008
drop cascades to table local2_102009
drop cascades to table local2
drop cascades to table local1
drop cascades to table local2_bydefault_102011
drop cascades to table local2_bydefault

View File

@ -1,5 +1,5 @@
CREATE SCHEMA issue_7887;
CREATE SCHEMA issue_7887;
SET search_path to 'issue_7887';
CREATE TABLE local1 (
id text not null primary key
@ -23,11 +23,70 @@ CREATE TABLE local2 (
INSERT INTO local1(id) VALUES ('aaaaa'), ('bbbbb'), ('ccccc');
INSERT INTO reference1(id, reference_col1) VALUES (1, 'test'), (2, 'test2'), (3, 'test3');
-- The statement that triggers the bug:
--
-- Partial insert: omit the identity column
-- This triggers the known bug in older code paths if not fixed.
--
INSERT INTO local2(local1fk, reference1fk)
SELECT id, 1
FROM local1;
-- If you want to see the error in the regression output, you might do something like:
-- NOTE: The next line is typically how you'd test for an error in a .sql regression test
-- but with a custom "expected" file you'll confirm you get the "invalid string enlargement request size: -4" text
-- Check inserted rows in local2
SELECT * FROM local2;
-- We do a "INSERT INTO local2(id, local1fk, reference1fk) SELECT 9999, id, 2" which
-- should fail under normal PG rules if no OVERRIDING clause is used.
INSERT INTO local2(id, local1fk, reference1fk)
SELECT 9999, id, 2 FROM local1 LIMIT 1;
-- Using OVERRIDING SYSTEM VALUE to override ALWAYS identity
INSERT INTO local2(id, local1fk, reference1fk)
OVERRIDING SYSTEM VALUE
SELECT 9999, id, 2 FROM local1 LIMIT 1;
-- Create a second table with BY DEFAULT identity to test different identity mode
CREATE TABLE local2_bydefault (
id int NOT NULL GENERATED BY DEFAULT AS IDENTITY,
local1fk text NOT NULL,
reference1fk int NOT NULL,
CONSTRAINT loc1fk_bd FOREIGN KEY (local1fk) REFERENCES local1(id),
CONSTRAINT reference1fk_bd FOREIGN KEY (reference1fk) REFERENCES reference1(id),
CONSTRAINT testlocpk_bd PRIMARY KEY (id)
);
INSERT INTO local1(id) VALUES ('xxxxx'), ('yyyyy'), ('ddddd'), ('zzzzz');
INSERT INTO local2_bydefault(local1fk, reference1fk)
SELECT 'xxxxx', 1;
-- Show inserted row in local2_bydefault
SELECT * FROM local2_bydefault;
--
-- Overriding a BY DEFAULT identity with user value
-- (which is allowed even without OVERRIDING clause).
--
-- Provide explicit id for BY DEFAULT identity => no special OVERRIDING needed
INSERT INTO local2_bydefault(id, local1fk, reference1fk)
VALUES (5000, 'yyyyy', 2);
-- Show rows (we expect id=5000 and one with auto-generated ID)
SELECT * FROM local2_bydefault ORDER BY id;
-- Insert referencing reference1fk=3 => partial insert on both tables
INSERT INTO local2(local1fk, reference1fk)
VALUES ('ddddd', 3);
INSERT INTO local2_bydefault(local1fk, reference1fk)
SELECT 'zzzzz', 3;
-- Show final state of local2 and local2_bydefault
SELECT 'local2' as table_name, * FROM local2
UNION ALL
SELECT 'local2_bydefault', * FROM local2_bydefault
ORDER BY table_name, id;
DROP SCHEMA issue_7887 CASCADE;