mirror of https://github.com/citusdata/citus.git
Merge pull request #1621 from citusdata/multi_row_insert_defaults
Allow default columns in multi-row INSERTstemp_tables
commit
c68bd7efa7
|
@ -100,7 +100,6 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
|
|||
static void PrepareMasterJobDirectory(Job *workerJob);
|
||||
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||
static Relation StubRelation(TupleDesc tupleDescriptor);
|
||||
static bool IsMultiRowInsert(Query *query);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -195,20 +194,6 @@ RouterCreateScan(CustomScan *scan)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMultiRowInsert returns whether the given query is a multi-row INSERT.
|
||||
*
|
||||
* It does this by determining whether the query is an INSERT that has an
|
||||
* RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away
|
||||
* in transformInsertStmt, and instead use the target list.
|
||||
*/
|
||||
static bool
|
||||
IsMultiRowInsert(Query *query)
|
||||
{
|
||||
return ExtractDistributedInsertValuesRTE(query) != NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CoordinatorInsertSelectCrateScan creates the scan state for executing
|
||||
* INSERT..SELECT into a distributed table via the coordinator.
|
||||
|
|
|
@ -130,6 +130,7 @@ static List * TargetShardIntervalsForRouter(Query *query,
|
|||
RelationRestrictionContext *restrictionContext,
|
||||
bool *multiShardQuery);
|
||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||
static void NormalizeMultiRowInsertTargetList(Query *query);
|
||||
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
|
||||
static List * GroupInsertValuesByShardId(List *insertValuesList);
|
||||
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
|
||||
|
@ -1081,9 +1082,25 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
|||
bool requiresMasterEvaluation = false;
|
||||
bool deferredPruning = false;
|
||||
|
||||
if (!CanShardPrune(distributedTableId, query))
|
||||
bool isMultiRowInsert = IsMultiRowInsert(query);
|
||||
if (isMultiRowInsert)
|
||||
{
|
||||
/* there is a non-constant in the partition column, cannot prune yet */
|
||||
/* add default expressions to RTE_VALUES in multi-row INSERTs */
|
||||
NormalizeMultiRowInsertTargetList(originalQuery);
|
||||
}
|
||||
|
||||
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
|
||||
{
|
||||
/*
|
||||
* If there is a non-constant (e.g. parameter, function call) in the partition
|
||||
* column of the INSERT then we defer shard pruning until the executor where
|
||||
* these values are known.
|
||||
*
|
||||
* XXX: We also defer pruning for multi-row INSERTs because of some current
|
||||
* limitations with the way multi-row INSERTs are handled. Most notably, we
|
||||
* don't evaluate functions in task->rowValuesList. Therefore we need to
|
||||
* perform function evaluation before we can run RouterInsertTaskList.
|
||||
*/
|
||||
taskList = NIL;
|
||||
deferredPruning = true;
|
||||
|
||||
|
@ -1997,6 +2014,20 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMultiRowInsert returns whether the given query is a multi-row INSERT.
|
||||
*
|
||||
* It does this by determining whether the query is an INSERT that has an
|
||||
* RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away
|
||||
* in transformInsertStmt, and instead use the target list.
|
||||
*/
|
||||
bool
|
||||
IsMultiRowInsert(Query *query)
|
||||
{
|
||||
return ExtractDistributedInsertValuesRTE(query) != NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
|
||||
* query is not an INSERT, or if the INSERT does not have a VALUES RTE
|
||||
|
@ -2030,6 +2061,104 @@ ExtractDistributedInsertValuesRTE(Query *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* NormalizeMultiRowInsertTargetList ensures all elements of multi-row INSERT target
|
||||
* lists are Vars. In multi-row INSERTs, most target list entries contain a Var
|
||||
* expression pointing to a position within the values_lists field of a VALUES
|
||||
* RTE, but non-NULL default columns are handled differently. Instead of adding
|
||||
* the default expression to each row, a single expression encoding the DEFAULT
|
||||
* appears in the target list. For consistency, we move these expressions into
|
||||
* values lists and replace them with an appropriately constructed Var.
|
||||
*/
|
||||
static void
|
||||
NormalizeMultiRowInsertTargetList(Query *query)
|
||||
{
|
||||
ListCell *valuesListCell = NULL;
|
||||
ListCell *targetEntryCell = NULL;
|
||||
int targetEntryNo = 0;
|
||||
|
||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||
if (valuesRTE == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach(valuesListCell, valuesRTE->values_lists)
|
||||
{
|
||||
List *valuesList = (List *) lfirst(valuesListCell);
|
||||
Expr **valuesArray = (Expr **) PointerArrayFromList(valuesList);
|
||||
List *expandedValuesList = NIL;
|
||||
|
||||
foreach(targetEntryCell, query->targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
Expr *targetExpr = targetEntry->expr;
|
||||
|
||||
if (IsA(targetExpr, Var))
|
||||
{
|
||||
/* expression from the VALUES section */
|
||||
Var *targetListVar = (Var *) targetExpr;
|
||||
targetExpr = valuesArray[targetListVar->varattno - 1];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* copy the column's default expression */
|
||||
targetExpr = copyObject(targetExpr);
|
||||
}
|
||||
|
||||
expandedValuesList = lappend(expandedValuesList, targetExpr);
|
||||
}
|
||||
|
||||
valuesListCell->data.ptr_value = (void *) expandedValuesList;
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
|
||||
/* reset coltypes, coltypmods, colcollations and rebuild them below */
|
||||
valuesRTE->coltypes = NIL;
|
||||
valuesRTE->coltypmods = NIL;
|
||||
valuesRTE->colcollations = NIL;
|
||||
#endif
|
||||
|
||||
foreach(targetEntryCell, query->targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = lfirst(targetEntryCell);
|
||||
Node *targetExprNode = (Node *) targetEntry->expr;
|
||||
Oid targetType = InvalidOid;
|
||||
int32 targetTypmod = -1;
|
||||
Oid targetColl = InvalidOid;
|
||||
Var *syntheticVar = NULL;
|
||||
|
||||
/* RTE_VALUES comes 2nd, after destination table */
|
||||
Index valuesVarno = 2;
|
||||
|
||||
targetEntryNo++;
|
||||
|
||||
targetType = exprType(targetExprNode);
|
||||
targetTypmod = exprTypmod(targetExprNode);
|
||||
targetColl = exprCollation(targetExprNode);
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
valuesRTE->coltypes = lappend_oid(valuesRTE->coltypes, targetType);
|
||||
valuesRTE->coltypmods = lappend_int(valuesRTE->coltypmods, targetTypmod);
|
||||
valuesRTE->colcollations = lappend_oid(valuesRTE->colcollations, targetColl);
|
||||
#endif
|
||||
|
||||
if (IsA(targetExprNode, Var))
|
||||
{
|
||||
Var *targetVar = (Var *) targetExprNode;
|
||||
targetVar->varattno = targetEntryNo;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* replace the original expression with a Var referencing values_lists */
|
||||
syntheticVar = makeVar(valuesVarno, targetEntryNo, targetType, targetTypmod,
|
||||
targetColl, 0);
|
||||
targetEntry->expr = (Expr *) syntheticVar;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IntersectPlacementList performs placement pruning based on matching on
|
||||
* nodeName:nodePort fields of shard placement data. We start pruning from all
|
||||
|
|
|
@ -49,6 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query);
|
|||
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
|
||||
extern bool IsMultiRowInsert(Query *query);
|
||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||
ShardInterval *shardInterval);
|
||||
|
||||
|
|
|
@ -687,15 +687,14 @@ create_distributed_table
|
|||
step s1-initialize: COPY insert_hash FROM PROGRAM 'echo 0, a\\n1, b\\n2, c\\n3, d\\n4, e' WITH CSV;
|
||||
step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0;
|
||||
step s1-begin: BEGIN;
|
||||
WARNING: INSERT has more target columns than expressions
|
||||
step s1-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm');
|
||||
ERROR: could not modify any active placements
|
||||
step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column;
|
||||
step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column; <waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-ddl-drop-column: <... completed>
|
||||
step s1-select-count: SELECT COUNT(*) FROM insert_hash;
|
||||
count
|
||||
|
||||
5
|
||||
8
|
||||
step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1');
|
||||
run_command_on_workers
|
||||
|
||||
|
@ -897,13 +896,11 @@ step s1-begin: BEGIN;
|
|||
step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0;
|
||||
step s2-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm'); <waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
WARNING: INSERT has more target columns than expressions
|
||||
step s2-insert-multi-row: <... completed>
|
||||
error in steps s1-commit s2-insert-multi-row: ERROR: could not modify any active placements
|
||||
step s1-select-count: SELECT COUNT(*) FROM insert_hash;
|
||||
count
|
||||
|
||||
5
|
||||
8
|
||||
step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1');
|
||||
run_command_on_workers
|
||||
|
||||
|
|
|
@ -690,6 +690,182 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *
|
|||
3 | 103 | Mynt
|
||||
(1 row)
|
||||
|
||||
-- Test multi-row insert with serial in the partition column
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
5 | 105 | Mynt
|
||||
4 | 104 | Wayz
|
||||
(2 rows)
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *;
|
||||
id | app_id | name
|
||||
-----+--------+------
|
||||
6 | | Foo
|
||||
300 | | Wah
|
||||
(2 rows)
|
||||
|
||||
PREPARE prep(varchar) AS
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *;
|
||||
EXECUTE prep('version-1');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
7 | | version-1.1
|
||||
400 | | version-1.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-2');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
8 | | version-2.1
|
||||
400 | | version-2.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-3');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-3.2
|
||||
9 | | version-3.1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-4');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
10 | | version-4.1
|
||||
400 | | version-4.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-5');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-5.2
|
||||
11 | | version-5.1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-6');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-6.2
|
||||
12 | | version-6.1
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id, name;
|
||||
id | app_id | name
|
||||
-----+--------+-----------------
|
||||
1 | 101 | Fauxkemon Geaux
|
||||
2 | 102 | Wayz
|
||||
3 | 103 | Mynt
|
||||
4 | 104 | Wayz
|
||||
5 | 105 | Mynt
|
||||
6 | | Foo
|
||||
7 | | version-1.1
|
||||
8 | | version-2.1
|
||||
9 | | version-3.1
|
||||
10 | | version-4.1
|
||||
11 | | version-5.1
|
||||
12 | | version-6.1
|
||||
300 | | Wah
|
||||
400 | | version-1.2
|
||||
400 | | version-2.2
|
||||
400 | | version-3.2
|
||||
400 | | version-4.2
|
||||
400 | | version-5.2
|
||||
400 | | version-6.2
|
||||
(19 rows)
|
||||
|
||||
TRUNCATE app_analytics_events;
|
||||
-- Test multi-row insert with a dropped column
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
id | name
|
||||
----+------
|
||||
14 | Mynt
|
||||
13 | Wayz
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | name
|
||||
----+------
|
||||
13 | Wayz
|
||||
14 | Mynt
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with a dropped column before the partition column
|
||||
CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'name');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
id | name
|
||||
----+------
|
||||
3 | Mynt
|
||||
3 | Wayz
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events WHERE name = 'Wayz';
|
||||
id | name
|
||||
----+------
|
||||
3 | Wayz
|
||||
(1 row)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with serial in a reference table
|
||||
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||
SELECT create_reference_table('app_analytics_events');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
1 | 104 | Wayz
|
||||
2 | 105 | Mynt
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
1 | 104 | Wayz
|
||||
2 | 105 | Mynt
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with serial in a non-partition column
|
||||
CREATE TABLE app_analytics_events (id int, app_id serial, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id;
|
||||
name | app_id
|
||||
------+--------
|
||||
Mynt | 2
|
||||
Wayz | 1
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
98 | 2 | Mynt
|
||||
99 | 1 | Wayz
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- test UPDATE with subqueries
|
||||
CREATE TABLE raw_table (id bigint, value bigint);
|
||||
|
|
|
@ -447,6 +447,65 @@ INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURN
|
|||
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||
|
||||
-- Test multi-row insert with serial in the partition column
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *;
|
||||
|
||||
PREPARE prep(varchar) AS
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *;
|
||||
|
||||
EXECUTE prep('version-1');
|
||||
EXECUTE prep('version-2');
|
||||
EXECUTE prep('version-3');
|
||||
EXECUTE prep('version-4');
|
||||
EXECUTE prep('version-5');
|
||||
EXECUTE prep('version-6');
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id, name;
|
||||
TRUNCATE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with a dropped column
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with a dropped column before the partition column
|
||||
CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'name');
|
||||
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events WHERE name = 'Wayz';
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with serial in a reference table
|
||||
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||
SELECT create_reference_table('app_analytics_events');
|
||||
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with serial in a non-partition column
|
||||
CREATE TABLE app_analytics_events (id int, app_id serial, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- test UPDATE with subqueries
|
||||
|
|
Loading…
Reference in New Issue