mirror of https://github.com/citusdata/citus.git
Merge pull request #1613 from citusdata/fix_ref_table_multi_row_returning
Fix multi-row INSERT with RETURNING on reference tablespull/1618/head^2
commit
b4cc8939fc
|
@ -22,6 +22,7 @@
|
|||
#include "distributed/multi_master_planner.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_resowner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
|
@ -204,27 +205,7 @@ RouterCreateScan(CustomScan *scan)
|
|||
static bool
|
||||
IsMultiRowInsert(Query *query)
|
||||
{
|
||||
ListCell *rteCell = NULL;
|
||||
bool hasValuesRTE = false;
|
||||
|
||||
CmdType commandType = query->commandType;
|
||||
if (commandType != CMD_INSERT)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(rteCell, query->rtable)
|
||||
{
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
|
||||
|
||||
if (rte->rtekind == RTE_VALUES)
|
||||
{
|
||||
hasValuesRTE = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return hasValuesRTE;
|
||||
return ExtractDistributedInsertValuesRTE(query) != NULL;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -31,8 +31,7 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
static RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query,
|
||||
Oid distributedTableId);
|
||||
|
||||
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
|
||||
RangeTblEntry *valuesRTE, Task *task);
|
||||
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
|
||||
|
@ -47,8 +46,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
|
|||
{
|
||||
ListCell *taskCell = NULL;
|
||||
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
|
||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery,
|
||||
relationId);
|
||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
||||
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
|
@ -108,50 +106,6 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
|
||||
* query is not an INSERT, or if the table is a reference table, or if the
|
||||
* INSERT does not have a VALUES RTE (i.e. it is not a multi-row INSERT), this
|
||||
* function returns NULL. If all those conditions are met, an RTE representing
|
||||
* the multiple values of a multi-row INSERT is returned.
|
||||
*/
|
||||
static RangeTblEntry *
|
||||
ExtractDistributedInsertValuesRTE(Query *query, Oid distributedTableId)
|
||||
{
|
||||
RangeTblEntry *valuesRTE = NULL;
|
||||
uint32 rangeTableId = 1;
|
||||
Var *partitionColumn = NULL;
|
||||
TargetEntry *targetEntry = NULL;
|
||||
|
||||
if (query->commandType != CMD_INSERT)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||
if (partitionColumn == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
targetEntry = get_tle_by_resno(query->targetList, partitionColumn->varattno);
|
||||
Assert(targetEntry != NULL);
|
||||
|
||||
if (IsA(targetEntry->expr, Var))
|
||||
{
|
||||
Var *partitionVar = (Var *) targetEntry->expr;
|
||||
|
||||
valuesRTE = rt_fetch(partitionVar->varno, query->rtable);
|
||||
if (valuesRTE->rtekind != RTE_VALUES)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return valuesRTE;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateTaskQueryString updates the query string stored within the provided
|
||||
* Task. If the Task has row values from a multi-row INSERT, those are injected
|
||||
|
@ -169,6 +123,7 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
|
|||
if (valuesRTE != NULL)
|
||||
{
|
||||
Assert(valuesRTE->rtekind == RTE_VALUES);
|
||||
Assert(task->rowValuesLists != NULL);
|
||||
|
||||
oldValuesLists = valuesRTE->values_lists;
|
||||
valuesRTE->values_lists = task->rowValuesLists;
|
||||
|
|
|
@ -1854,6 +1854,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
{
|
||||
int shardCount = 0;
|
||||
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||
RangeTblEntry *valuesRTE = NULL;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
ModifyRoute *modifyRoute = NULL;
|
||||
|
||||
|
@ -1867,7 +1868,17 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
modifyRoute = palloc(sizeof(ModifyRoute));
|
||||
|
||||
modifyRoute->shardId = shardInterval->shardId;
|
||||
modifyRoute->rowValuesLists = NIL;
|
||||
|
||||
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
/* add the values list for a multi-row INSERT */
|
||||
modifyRoute->rowValuesLists = valuesRTE->values_lists;
|
||||
}
|
||||
else
|
||||
{
|
||||
modifyRoute->rowValuesLists = NIL;
|
||||
}
|
||||
|
||||
modifyRouteList = lappend(modifyRouteList, modifyRoute);
|
||||
|
||||
|
@ -1986,6 +1997,39 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
|
||||
* query is not an INSERT, or if the INSERT does not have a VALUES RTE
|
||||
* (i.e. it is not a multi-row INSERT), this function returns NULL.
|
||||
* If all those conditions are met, an RTE representing the multiple values
|
||||
* of a multi-row INSERT is returned.
|
||||
*/
|
||||
RangeTblEntry *
|
||||
ExtractDistributedInsertValuesRTE(Query *query)
|
||||
{
|
||||
ListCell *rteCell = NULL;
|
||||
RangeTblEntry *valuesRTE = NULL;
|
||||
|
||||
if (query->commandType != CMD_INSERT)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
foreach(rteCell, query->rtable)
|
||||
{
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
|
||||
|
||||
if (rte->rtekind == RTE_VALUES)
|
||||
{
|
||||
valuesRTE = rte;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return valuesRTE;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IntersectPlacementList performs placement pruning based on matching on
|
||||
* nodeName:nodePort fields of shard placement data. We start pruning from all
|
||||
|
|
|
@ -48,6 +48,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
|||
extern Oid ExtractFirstDistributedTableId(Query *query);
|
||||
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
|
||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||
ShardInterval *shardInterval);
|
||||
|
||||
|
|
|
@ -888,9 +888,14 @@ SELECT create_reference_table('reference_summary_table');
|
|||
INSERT INTO reference_raw_table VALUES (1, 100);
|
||||
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||
INSERT INTO reference_raw_table VALUES (1, 300);
|
||||
INSERT INTO reference_raw_table VALUES (2, 400);
|
||||
INSERT INTO reference_raw_table VALUES (2, 500);
|
||||
INSERT INTO reference_raw_table VALUES (1,300), (2, 400), (2,500) RETURNING *;
|
||||
id | value
|
||||
----+-------
|
||||
1 | 300
|
||||
2 | 400
|
||||
2 | 500
|
||||
(3 rows)
|
||||
|
||||
INSERT INTO reference_summary_table VALUES (1);
|
||||
INSERT INTO reference_summary_table VALUES (2);
|
||||
SELECT * FROM reference_summary_table ORDER BY id;
|
||||
|
|
|
@ -588,9 +588,7 @@ SELECT create_reference_table('reference_summary_table');
|
|||
INSERT INTO reference_raw_table VALUES (1, 100);
|
||||
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||
INSERT INTO reference_raw_table VALUES (1, 300);
|
||||
INSERT INTO reference_raw_table VALUES (2, 400);
|
||||
INSERT INTO reference_raw_table VALUES (2, 500);
|
||||
INSERT INTO reference_raw_table VALUES (1,300), (2, 400), (2,500) RETURNING *;
|
||||
|
||||
INSERT INTO reference_summary_table VALUES (1);
|
||||
INSERT INTO reference_summary_table VALUES (2);
|
||||
|
|
Loading…
Reference in New Issue