Merge pull request #1517 from citusdata/feature/multi_row_insert

Enable multi-row INSERTs

cr: @marcocitus
pull/1546/head
Jason Petersen 2017-08-10 01:25:13 -07:00 committed by GitHub
commit 4e8d07c672
30 changed files with 860 additions and 266 deletions

View File

@ -66,7 +66,8 @@ static CustomExecMethods RouterSingleModifyCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterMultiModifyCustomExecMethods = {
/* not static to enable reference by multi-modify logic in router execution */
CustomExecMethods RouterMultiModifyCustomExecMethods = {
.CustomName = "RouterMultiModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterMultiModifyExecScan,

View File

@ -419,6 +419,11 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
RaiseDeferredError(planningError, ERROR);
}
if (list_length(taskList) > 1)
{
node->methods = &RouterMultiModifyCustomExecMethods;
}
workerJob->taskList = taskList;
}
@ -428,7 +433,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
/* assign task placements */
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}

View File

@ -26,11 +26,15 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "parser/parsetree.h"
#include "storage/lock.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
static RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query,
Oid distributedTableId);
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
RangeTblEntry *valuesRTE, Task *task);
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
@ -43,11 +47,12 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
{
ListCell *taskCell = NULL;
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery,
relationId);
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
StringInfo newQueryString = makeStringInfo();
Query *query = originalQuery;
if (task->insertSelectQuery)
@ -90,31 +95,108 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
}
}
/*
* For INSERT queries, we only have one relation to update, so we can
* use deparse_shard_query(). For UPDATE and DELETE queries, we may have
* subqueries and joins, so we use relation shard list to update shard
* names and call pg_get_query_def() directly.
*/
if (query->commandType == CMD_INSERT)
{
deparse_shard_query(query, relationId, task->anchorShardId, newQueryString);
}
else
{
List *relationShardList = task->relationShardList;
UpdateRelationToShardNames((Node *) query, relationShardList);
ereport(DEBUG4, (errmsg("query before rebuilding: %s", task->queryString)));
pg_get_query_def(query, newQueryString);
}
UpdateTaskQueryString(query, relationId, valuesRTE, task);
ereport(DEBUG4, (errmsg("distributed statement: %s", newQueryString->data)));
task->queryString = newQueryString->data;
ereport(DEBUG4, (errmsg("query after rebuilding: %s", task->queryString)));
}
}
/*
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
* query is not an INSERT, or if the table is a reference table, or if the
* INSERT does not have a VALUES RTE (i.e. it is not a multi-row INSERT), this
* function returns NULL. If all those conditions are met, an RTE representing
* the multiple values of a multi-row INSERT is returned.
*/
static RangeTblEntry *
ExtractDistributedInsertValuesRTE(Query *query, Oid distributedTableId)
{
RangeTblEntry *valuesRTE = NULL;
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
TargetEntry *targetEntry = NULL;
if (query->commandType != CMD_INSERT)
{
return NULL;
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
if (partitionColumn == NULL)
{
return NULL;
}
targetEntry = get_tle_by_resno(query->targetList, partitionColumn->varattno);
Assert(targetEntry != NULL);
if (IsA(targetEntry->expr, Var))
{
Var *partitionVar = (Var *) targetEntry->expr;
valuesRTE = rt_fetch(partitionVar->varno, query->rtable);
if (valuesRTE->rtekind != RTE_VALUES)
{
return NULL;
}
}
return valuesRTE;
}
/*
* UpdateTaskQueryString updates the query string stored within the provided
* Task. If the Task has row values from a multi-row INSERT, those are injected
* into the provided query (using the provided valuesRTE, which must belong to
* the query) before deparse occurs (the query's full VALUES list will be
* restored before this function returns).
*/
static void
UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE,
Task *task)
{
StringInfo queryString = makeStringInfo();
List *oldValuesLists = NIL;
if (valuesRTE != NULL)
{
Assert(valuesRTE->rtekind == RTE_VALUES);
oldValuesLists = valuesRTE->values_lists;
valuesRTE->values_lists = task->rowValuesLists;
}
/*
* For INSERT queries, we only have one relation to update, so we can
* use deparse_shard_query(). For UPDATE and DELETE queries, we may have
* subqueries and joins, so we use relation shard list to update shard
* names and call pg_get_query_def() directly.
*/
if (query->commandType == CMD_INSERT)
{
deparse_shard_query(query, distributedTableId, task->anchorShardId, queryString);
}
else
{
List *relationShardList = task->relationShardList;
UpdateRelationToShardNames((Node *) query, relationShardList);
pg_get_query_def(query, queryString);
}
if (valuesRTE != NULL)
{
valuesRTE->values_lists = oldValuesLists;
}
task->queryString = queryString->data;
}
/*
* UpdateRelationToShardNames walks over the query tree and appends shard ids to
* relations. It uses unique identity value to establish connection between a

View File

@ -71,6 +71,29 @@
#include "catalog/pg_proc.h"
#include "optimizer/planmain.h"
/* intermediate value for INSERT processing */
typedef struct InsertValues
{
Expr *partitionValueExpr; /* partition value provided in INSERT row */
List *rowValues; /* full values list of INSERT row, possibly NIL */
int64 shardId; /* target shard for this row, possibly invalid */
Index listIndex; /* index to make our sorting stable */
} InsertValues;
/*
* A ModifyRoute encapsulates the the information needed to route modifications
* to the appropriate shard. For a single-shard modification, only one route
* is needed, but in the case of e.g. a multi-row INSERT, lists of these values
* will help divide the rows by their destination shards, permitting later
* shard-and-row-specific extension of the original SQL.
*/
typedef struct ModifyRoute
{
int64 shardId; /* identifier of target shard */
List *rowValuesLists; /* for multi-row INSERTs, list of rows to be inserted */
} ModifyRoute;
typedef struct WalkerState
{
@ -99,9 +122,6 @@ static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry);
static bool CanShardPrune(Oid distributedTableId, Query *query);
static Job * CreateJob(Query *query);
static Task * CreateTask(TaskType taskType);
static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError);
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Job * RouterJob(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
DeferredErrorMessage **planningError);
@ -110,6 +130,9 @@ static List * TargetShardIntervalsForRouter(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList);
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
static bool MultiRouterPlannableQuery(Query *query,
RelationRestrictionContext *restrictionContext);
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
@ -120,6 +143,8 @@ static bool SelectsFromDistributedTable(List *rangeTableList);
#if (PG_VERSION_NUM >= 100000)
static List * get_all_actual_clauses(List *restrictinfo_list);
#endif
static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement);
/*
@ -471,7 +496,6 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
bool isCoordinator = IsCoordinator();
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;
bool hasValuesScan = false;
uint32 queryTableCount = 0;
bool specifiesPartitionValue = false;
ListCell *setTargetCell = NULL;
@ -555,7 +579,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
}
else if (rangeTableEntry->rtekind == RTE_VALUES)
{
hasValuesScan = true;
/* do nothing, this type is supported */
}
else
{
@ -626,24 +650,6 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
}
}
/* reject queries which involve multi-row inserts */
if (hasValuesScan)
{
/*
* NB: If you remove this check you must also change the checks further in this
* method and ensure that VOLATILE function calls aren't allowed in INSERT
* statements. Currently they're allowed but the function call is replaced
* with a constant, and if you're inserting multiple rows at once the function
* should return a different value for each row.
*/
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot perform distributed planning for the given"
" modification",
"Multi-row INSERTs to distributed tables are not "
"supported.",
NULL);
}
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
@ -1143,7 +1149,8 @@ CanShardPrune(Oid distributedTableId, Query *query)
{
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
Expr *partitionValueExpr = NULL;
List *insertValuesList = NIL;
ListCell *insertValuesCell = NULL;
if (query->commandType != CMD_INSERT)
{
@ -1158,14 +1165,19 @@ CanShardPrune(Oid distributedTableId, Query *query)
return true;
}
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
if (IsA(partitionValueExpr, Const))
/* get full list of partition values and ensure they are all Consts */
insertValuesList = ExtractInsertValuesList(query, partitionColumn);
foreach(insertValuesCell, insertValuesList)
{
/* can do shard pruning if the partition column is constant */
return true;
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
if (!IsA(insertValues->partitionValueExpr, Const))
{
/* can't do shard pruning if the partition column is not constant */
return false;
}
}
return false;
return true;
}
@ -1198,8 +1210,9 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
List *
RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
{
ShardInterval *shardInterval = NULL;
Task *modifyTask = NULL;
List *insertTaskList = NIL;
List *modifyRouteList = NIL;
ListCell *modifyRouteCell = NULL;
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
@ -1208,26 +1221,30 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
Assert(query->commandType == CMD_INSERT);
shardInterval = FindShardForInsert(query, cacheEntry, planningError);
modifyRouteList = BuildRoutesForInsert(query, planningError);
if (*planningError != NULL)
{
return NIL;
}
/* an INSERT always routes to exactly one shard */
Assert(shardInterval != NULL);
modifyTask = CreateTask(MODIFY_TASK);
modifyTask->anchorShardId = shardInterval->shardId;
modifyTask->replicationModel = cacheEntry->replicationModel;
if (query->onConflict != NULL)
foreach(modifyRouteCell, modifyRouteList)
{
modifyTask->upsertQuery = true;
ModifyRoute *modifyRoute = (ModifyRoute *) lfirst(modifyRouteCell);
Task *modifyTask = CreateTask(MODIFY_TASK);
modifyTask->anchorShardId = modifyRoute->shardId;
modifyTask->replicationModel = cacheEntry->replicationModel;
modifyTask->rowValuesLists = modifyRoute->rowValuesLists;
if (query->onConflict != NULL)
{
modifyTask->upsertQuery = true;
}
insertTaskList = lappend(insertTaskList, modifyTask);
}
return list_make1(modifyTask);
return insertTaskList;
}
@ -1264,132 +1281,6 @@ CreateTask(TaskType taskType)
}
/*
* FindShardForInsert returns the shard interval for an INSERT query or NULL if
* the partition column value is defined as an expression that still needs to be
* evaluated. If the partition column value falls within 0 or multiple
* (overlapping) shards, the planningError is set.
*/
static ShardInterval *
FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError)
{
Oid distributedTableId = cacheEntry->relationId;
char partitionMethod = cacheEntry->partitionMethod;
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
Expr *partitionValueExpr = NULL;
Const *partitionValueConst = NULL;
int prunedShardCount = 0;
List *prunedShardList = NIL;
Assert(query->commandType == CMD_INSERT);
/* reference tables do not have a partition column, but can only have one shard */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
int shardCount = cacheEntry->shardIntervalArrayLength;
if (shardCount != 1)
{
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
}
return cacheEntry->sortedShardIntervalArray[0];
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
/* non-constants should have been caught by CanShardPrune */
if (!IsA(partitionValueExpr, Const))
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT with a non-constant in the "
"partition column")));
}
partitionValueConst = (Const *) partitionValueExpr;
if (partitionValueConst->constisnull)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT with NULL in the partition "
"column")));
}
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
Datum partitionValue = partitionValueConst->constvalue;
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
if (shardInterval != NULL)
{
prunedShardList = list_make1(shardInterval);
}
}
else
{
List *restrictClauseList = NIL;
Index tableId = 1;
OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) equalityExpr);
Const *rightConst = (Const *) rightOp;
Assert(IsA(rightOp, Const));
rightConst->constvalue = partitionValueConst->constvalue;
rightConst->constisnull = partitionValueConst->constisnull;
rightConst->constbyval = partitionValueConst->constbyval;
restrictClauseList = list_make1(equalityExpr);
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
}
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
{
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *targetCountType = NULL;
if (prunedShardCount == 0)
{
targetCountType = "no";
}
else
{
targetCountType = "multiple";
}
if (prunedShardCount == 0)
{
appendStringInfo(errorHint, "Make sure you have created a shard which "
"can receive this partition column value.");
}
else
{
appendStringInfo(errorHint, "Make sure the value for partition column "
"\"%s\" falls into a single shard.",
partitionColumnName);
}
appendStringInfo(errorMessage, "cannot run INSERT command which targets %s "
"shards", targetCountType);
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
errorHint->data);
return NULL;
}
return (ShardInterval *) linitial(prunedShardList);
}
/*
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
* for the first distributed table in that query. If the function cannot find a
@ -1420,27 +1311,6 @@ ExtractFirstDistributedTableId(Query *query)
}
/*
* ExtractPartitionValue extracts the partition column value from a the target
* of an INSERT command. If a partition value is missing altogether or is
* NULL, this function throws an error.
*/
static Expr *
ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
{
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
partitionColumn->varattno);
if (targetEntry == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT without a partition column "
"value")));
}
return targetEntry->expr;
}
/* RouterJob builds a Job to represent a single shard select/update/delete query */
static Job *
RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
@ -1957,6 +1827,167 @@ WorkersContainingAllShards(List *prunedShardIntervalsList)
}
/*
* BuildRoutesForInsert returns a list of ModifyRoute objects for an INSERT
* query or an empty list if the partition column value is defined as an ex-
* pression that still needs to be evaluated. If any partition column value
* falls within 0 or multiple (overlapping) shards, the planning error is set.
*
* Multi-row INSERTs are handled by grouping their rows by target shard. These
* groups are returned in ascending order by shard id, ready for later deparse
* to shard-specific SQL.
*/
static List *
BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
List *insertValuesList = NIL;
List *modifyRouteList = NIL;
ListCell *insertValuesCell = NULL;
Assert(query->commandType == CMD_INSERT);
/* reference tables can only have one shard */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
int shardCount = 0;
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
ShardInterval *shardInterval = NULL;
ModifyRoute *modifyRoute = NULL;
shardCount = list_length(shardIntervalList);
if (shardCount != 1)
{
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
}
shardInterval = linitial(shardIntervalList);
modifyRoute = palloc(sizeof(ModifyRoute));
modifyRoute->shardId = shardInterval->shardId;
modifyRoute->rowValuesLists = NIL;
modifyRouteList = lappend(modifyRouteList, modifyRoute);
return modifyRouteList;
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
/* get full list of insert values and iterate over them to prune */
insertValuesList = ExtractInsertValuesList(query, partitionColumn);
foreach(insertValuesCell, insertValuesList)
{
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
Const *partitionValueConst = NULL;
List *prunedShardList = NIL;
int prunedShardCount = 0;
ShardInterval *targetShard = NULL;
if (!IsA(insertValues->partitionValueExpr, Const))
{
/* shard pruning not possible right now */
return NIL;
}
partitionValueConst = (Const *) insertValues->partitionValueExpr;
if (partitionValueConst->constisnull)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT with NULL in the partition "
"column")));
}
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_RANGE)
{
Datum partitionValue = partitionValueConst->constvalue;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
distributedTableId);
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
if (shardInterval != NULL)
{
prunedShardList = list_make1(shardInterval);
}
}
else
{
List *restrictClauseList = NIL;
Index tableId = 1;
OpExpr *equalityExpr = MakeOpExpression(partitionColumn,
BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) equalityExpr);
Const *rightConst = (Const *) rightOp;
Assert(IsA(rightOp, Const));
rightConst->constvalue = partitionValueConst->constvalue;
rightConst->constisnull = partitionValueConst->constisnull;
rightConst->constbyval = partitionValueConst->constbyval;
restrictClauseList = list_make1(equalityExpr);
prunedShardList = PruneShards(distributedTableId, tableId,
restrictClauseList);
}
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
{
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *targetCountType = NULL;
if (prunedShardCount == 0)
{
targetCountType = "no";
}
else
{
targetCountType = "multiple";
}
if (prunedShardCount == 0)
{
appendStringInfo(errorHint, "Make sure you have created a shard which "
"can receive this partition column value.");
}
else
{
appendStringInfo(errorHint, "Make sure the value for partition column "
"\"%s\" falls into a single shard.",
partitionColumnName);
}
appendStringInfo(errorMessage, "cannot run INSERT command which targets %s "
"shards", targetCountType);
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
errorHint->data);
return NIL;
}
targetShard = (ShardInterval *) linitial(prunedShardList);
insertValues->shardId = targetShard->shardId;
}
modifyRouteList = GroupInsertValuesByShardId(insertValuesList);
return modifyRouteList;
}
/*
* IntersectPlacementList performs placement pruning based on matching on
* nodeName:nodePort fields of shard placement data. We start pruning from all
@ -1993,6 +2024,132 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
}
/*
* GroupInsertValuesByShardId takes care of grouping the rows from a multi-row
* INSERT by target shard. At this point, all pruning has taken place and we
* need only to build sets of rows for each destination. This is done by a
* simple sort (by shard identifier) and gather step. The sort has the side-
* effect of getting things in ascending order to avoid unnecessary deadlocks
* during Task execution.
*/
static List *
GroupInsertValuesByShardId(List *insertValuesList)
{
ModifyRoute *route = NULL;
ListCell *insertValuesCell = NULL;
List *modifyRouteList = NIL;
insertValuesList = SortList(insertValuesList, CompareInsertValuesByShardId);
foreach(insertValuesCell, insertValuesList)
{
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
int64 shardId = insertValues->shardId;
bool foundSameShardId = false;
if (route != NULL)
{
if (route->shardId == shardId)
{
foundSameShardId = true;
}
else
{
/* new shard id seen; current aggregation done; add to list */
modifyRouteList = lappend(modifyRouteList, route);
}
}
if (foundSameShardId)
{
/*
* Our current value has the same shard id as our aggregate object,
* so append the rowValues.
*/
route->rowValuesLists = lappend(route->rowValuesLists,
insertValues->rowValues);
}
else
{
/* we encountered a new shard id; build a new aggregate object */
route = (ModifyRoute *) palloc(sizeof(ModifyRoute));
route->shardId = insertValues->shardId;
route->rowValuesLists = list_make1(insertValues->rowValues);
}
}
/* left holding one final aggregate object; add to list */
modifyRouteList = lappend(modifyRouteList, route);
return modifyRouteList;
}
/*
* ExtractInsertValuesList extracts the partition column value for an INSERT
* command and returns it within an InsertValues struct. For single-row INSERTs
* this is simply a value extracted from the target list, but multi-row INSERTs
* will generate a List of InsertValues, each with full row values in addition
* to the partition value. If a partition value is NULL or missing altogether,
* this function errors.
*/
static List *
ExtractInsertValuesList(Query *query, Var *partitionColumn)
{
List *insertValuesList = NIL;
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
partitionColumn->varattno);
if (targetEntry == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT without a partition column "
"value")));
}
/*
* We've got a multi-row INSERT. PostgreSQL internally represents such
* commands by linking Vars in the target list to lists of values within
* a special VALUES range table entry. By extracting the right positional
* expression from each list within that RTE, we will extract the partition
* values for each row within the multi-row INSERT.
*/
if (IsA(targetEntry->expr, Var))
{
Var *partitionVar = (Var *) targetEntry->expr;
RangeTblEntry *referencedRTE = NULL;
ListCell *valuesListCell = NULL;
Index ivIndex = 0;
referencedRTE = rt_fetch(partitionVar->varno, query->rtable);
foreach(valuesListCell, referencedRTE->values_lists)
{
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
insertValues->rowValues = (List *) lfirst(valuesListCell);
insertValues->partitionValueExpr = list_nth(insertValues->rowValues,
(partitionVar->varattno - 1));
insertValues->shardId = INVALID_SHARD_ID;
insertValues->listIndex = ivIndex;
insertValuesList = lappend(insertValuesList, insertValues);
ivIndex++;
}
}
/* nothing's been found yet; this is a simple single-row INSERT */
if (insertValuesList == NIL)
{
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
insertValues->rowValues = NIL;
insertValues->partitionValueExpr = targetEntry->expr;
insertValues->shardId = INVALID_SHARD_ID;
insertValuesList = lappend(insertValuesList, insertValues);
}
return insertValuesList;
}
/*
* MultiRouterPlannableQuery returns true if given query can be router plannable.
* The query is router plannable if it is a modify query, or if its is a select
@ -2170,3 +2327,44 @@ get_all_actual_clauses(List *restrictinfo_list)
#endif
/*
* CompareInsertValuesByShardId does what it says in the name. Used for sorting
* InsertValues objects by their shard.
*/
static int
CompareInsertValuesByShardId(const void *leftElement, const void *rightElement)
{
InsertValues *leftValue = *((InsertValues **) leftElement);
InsertValues *rightValue = *((InsertValues **) rightElement);
int64 leftShardId = leftValue->shardId;
int64 rightShardId = rightValue->shardId;
Index leftIndex = leftValue->listIndex;
Index rightIndex = rightValue->listIndex;
if (leftShardId > rightShardId)
{
return 1;
}
else if (leftShardId < rightShardId)
{
return -1;
}
else
{
/* shard identifiers are the same, list index is secondary sort key */
if (leftIndex > rightIndex)
{
return 1;
}
else if (leftIndex < rightIndex)
{
return -1;
}
else
{
return 0;
}
}
}

View File

@ -32,6 +32,7 @@ typedef struct FunctionEvaluationContext
/* private function declarations */
static void EvaluateValuesListsItems(List *valuesLists, PlanState *planState);
static Node * EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState);
static Node * PartiallyEvaluateExpressionMutator(Node *expression,
FunctionEvaluationContext *context);
@ -63,14 +64,19 @@ RequiresMasterEvaluation(Query *query)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
if (rte->rtekind != RTE_SUBQUERY)
if (rte->rtekind == RTE_SUBQUERY)
{
continue;
if (RequiresMasterEvaluation(rte->subquery))
{
return true;
}
}
if (RequiresMasterEvaluation(rte->subquery))
else if (rte->rtekind == RTE_VALUES)
{
return true;
if (contain_mutable_functions((Node *) rte->values_lists))
{
return true;
}
}
}
@ -131,12 +137,14 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
if (rte->rtekind != RTE_SUBQUERY)
if (rte->rtekind == RTE_SUBQUERY)
{
continue;
ExecuteMasterEvaluableFunctions(rte->subquery, planState);
}
else if (rte->rtekind == RTE_VALUES)
{
EvaluateValuesListsItems(rte->values_lists, planState);
}
ExecuteMasterEvaluableFunctions(rte->subquery, planState);
}
foreach(cteCell, query->cteList)
@ -148,6 +156,35 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
}
/*
* EvaluateValuesListsItems siply does the work of walking over each expression
* in each value list contained in a multi-row INSERT's VALUES RTE. Basically
* a nested for loop to perform an in-place replacement of expressions with
* their ultimate values, should evaluation be necessary.
*/
static void
EvaluateValuesListsItems(List *valuesLists, PlanState *planState)
{
ListCell *exprListCell = NULL;
foreach(exprListCell, valuesLists)
{
List *exprList = (List *) lfirst(exprListCell);
ListCell *exprCell = NULL;
foreach(exprCell, exprList)
{
Expr *expr = (Expr *) lfirst(exprCell);
Node *modifiedNode = NULL;
modifiedNode = PartiallyEvaluateExpression((Node *) expr, planState);
exprCell->data.ptr_value = (void *) modifiedNode;
}
}
}
/*
* Walks the expression evaluating any node which invokes a function as long as a Var
* doesn't show up in the parameter list.

View File

@ -232,6 +232,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(replicationModel);
COPY_SCALAR_FIELD(insertSelectQuery);
COPY_NODE_FIELD(relationShardList);
COPY_NODE_FIELD(rowValuesLists);
}

View File

@ -441,6 +441,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(relationShardList);
WRITE_NODE_FIELD(rowValuesLists);
}

View File

@ -351,6 +351,7 @@ ReadTask(READFUNC_ARGS)
READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(relationShardList);
READ_NODE_FIELD(rowValuesLists);
READ_DONE();
}

View File

@ -28,6 +28,8 @@ typedef struct CitusScanState
} CitusScanState;
extern CustomExecMethods RouterMultiModifyCustomExecMethods;
extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan);

View File

@ -187,6 +187,8 @@ typedef struct Task
bool insertSelectQuery;
List *relationShardList;
List *rowValuesLists; /* rows to use when building multi-row INSERT */
} Task;

View File

@ -28,3 +28,62 @@ step s1-insert:
step s2-update:
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
starting permutation: s1-begin s1-multi-insert s2-update s1-commit
master_create_worker_shards
step s1-begin:
BEGIN;
step s1-multi-insert:
INSERT INTO test_concurrent_dml VALUES (1), (2);
step s2-update:
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
<waiting ...>
step s1-commit:
COMMIT;
step s2-update: <... completed>
starting permutation: s1-begin s1-multi-insert s2-multi-insert-overlap s1-commit
master_create_worker_shards
step s1-begin:
BEGIN;
step s1-multi-insert:
INSERT INTO test_concurrent_dml VALUES (1), (2);
step s2-multi-insert-overlap:
INSERT INTO test_concurrent_dml VALUES (1), (4);
<waiting ...>
step s1-commit:
COMMIT;
step s2-multi-insert-overlap: <... completed>
starting permutation: s1-begin s2-begin s1-multi-insert s2-multi-insert s1-commit s2-commit
master_create_worker_shards
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-multi-insert:
INSERT INTO test_concurrent_dml VALUES (1), (2);
step s2-multi-insert:
INSERT INTO test_concurrent_dml VALUES (3), (4);
step s1-commit:
COMMIT;
step s2-commit:
COMMIT;

View File

@ -98,7 +98,7 @@ step s1-get-current-transaction-id:
row
(0,290)
(0,301)
step s2-get-first-worker-active-transactions:
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM
@ -109,4 +109,4 @@ step s2-get-first-worker-active-transactions:
nodename nodeport success result
localhost 57637 t (0,290)
localhost 57637 t (0,301)

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
293 292 f
304 303 f
transactionnumberwaitingtransactionnumbers
292
293 292
303
304 303
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
297 296 f
298 296 f
298 297 t
308 307 f
309 307 f
309 308 t
transactionnumberwaitingtransactionnumbers
296
297 296
298 296,297
307
308 307
309 307,308
step s1-abort:
ABORT;

View File

@ -16,7 +16,7 @@ step s1-finish:
COMMIT;
step s2-insert: <... completed>
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102781"
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102793"
step s2-finish:
COMMIT;

View File

@ -299,14 +299,14 @@ Limit
Filter: (l_quantity < 5.0)
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0);
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-> Values Scan on "*VALUES*"
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem

View File

@ -299,14 +299,14 @@ Limit
Filter: (l_quantity < 5.0)
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0);
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-> Values Scan on "*VALUES*"
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem

View File

@ -1657,6 +1657,12 @@ BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
ROLLBACK;
-- Similarly, multi-row INSERTs will take part in transactions and reuse connections...
BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first (user_id, value_1) VALUES (105, 105), (106, 106);
ROLLBACK;
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES

View File

@ -190,7 +190,7 @@ CONTEXT: while executing command on localhost:57638
INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
ERROR: could not modify any active placements
SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
-- commands with non-constant partition values are supported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58);
-- values for other columns are totally fine
@ -201,10 +201,36 @@ ERROR: functions used in the WHERE clause of modification queries on distribute
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- multi-row inserts are supported
INSERT INTO limit_orders VALUES (12037, 'GOOG', 5634, '2001-04-16 03:37:28', 'buy', 0.50),
(12038, 'GOOG', 5634, '2001-04-17 03:37:28', 'buy', 2.50),
(12039, 'GOOG', 5634, '2001-04-18 03:37:28', 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 12037 AND 12039;
count
-------
3
(1 row)
-- even those with functions
INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50),
(22038, 'GOOG', 5634, now(), 'buy', 2.50),
(22039, 'GOOG', 5634, now(), 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039;
count
-------
3
(1 row)
-- even those with functions in their partition columns
INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'buy', 0.50),
(random() * 10 + 80000, 'GOOG', 5634, now(), 'buy', 2.50),
(random() * 10 + 80090, 'GOOG', 5634, now(), 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000;
count
-------
3
(1 row)
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported
-- INSERT INTO limit_orders SELECT * FROM limit_orders;

View File

@ -45,16 +45,28 @@ INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
-- replace a researcher, reusing their id
-- replace a researcher, reusing their id in a multi-row INSERT
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
INSERT INTO researchers VALUES (2, 1, 'John Backus');
INSERT INTO researchers VALUES (2, 1, 'John Backus'), (12, 1, 'Frances E. Allen');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
name
-------------
SELECT name FROM researchers WHERE lab_id = 1 AND id % 10 = 2;
name
------------------
John Backus
(1 row)
Frances E. Allen
(2 rows)
-- and the other way around
BEGIN;
INSERT INTO researchers VALUES (14, 2, 'Alan Kay'), (15, 2, 'Barbara Liskov');
DELETE FROM researchers WHERE id = 14 AND lab_id = 2;
ROLLBACK;
-- should have rolled everything back
SELECT * FROM researchers WHERE id = 15 AND lab_id = 2;
id | lab_id | name
----+--------+------
(0 rows)
-- abort a modification
BEGIN;
@ -209,6 +221,11 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
ROLLBACK;
-- but it is allowed after a multi-row insert
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
ROLLBACK;
-- after a COPY you can modify multiple shards, since they'll use different connections
BEGIN;
\copy researchers from stdin delimiter ','

View File

@ -104,10 +104,10 @@ ERROR: functions used in the WHERE clause of modification queries on distribute
-- commands with mutable but non-volatile functions(ie: stable func.) in their quals
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported
INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- commands with multiple rows are supported
INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()),
(2038, 'GOOG', 5634, now(), 'buy', random()),
(2039, 'GOOG', 5634, now(), 'buy', random());
-- connect back to the other node
\c - - - :worker_1_port
-- commands containing a CTE are unsupported

View File

@ -383,6 +383,15 @@ EXECUTE prepared_double_parameter_insert(3, 30);
EXECUTE prepared_double_parameter_insert(4, 40);
EXECUTE prepared_double_parameter_insert(5, 50);
EXECUTE prepared_double_parameter_insert(6, 60);
PREPARE prepared_multi_insert(int, int) AS
INSERT INTO prepare_table (key, value) VALUES ($1, $2), ($1 + 1, $2 + 10);
-- execute 6 times to trigger prepared statement usage
EXECUTE prepared_multi_insert( 7, 70);
EXECUTE prepared_multi_insert( 9, 90);
EXECUTE prepared_multi_insert(11, 110);
EXECUTE prepared_multi_insert(13, 130);
EXECUTE prepared_multi_insert(15, 150);
EXECUTE prepared_multi_insert(17, 170);
PREPARE prepared_non_partition_parameter_insert(int) AS
INSERT INTO prepare_table (key, value) VALUES (0, $1);
-- execute 6 times to trigger prepared statement usage
@ -420,7 +429,25 @@ SELECT * FROM prepare_table ORDER BY key, value;
5 |
6 | 60
6 |
(24 rows)
7 | 70
8 | 80
9 | 90
10 | 100
11 | 110
12 | 120
13 | 130
14 | 140
15 | 150
16 | 160
17 | 170
18 | 180
(36 rows)
SELECT master_modify_multiple_shards('DELETE FROM prepare_table WHERE value >= 70');
master_modify_multiple_shards
-------------------------------
12
(1 row)
-- check router executor select
PREPARE prepared_router_partition_column_select(int) AS

View File

@ -20,7 +20,7 @@ SELECT master_create_worker_shards('upsert_test', '4', '2');
(1 row)
-- do a regular insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1);
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2);
-- observe that there is a conflict and the following query does nothing
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING;
-- same as the above with different syntax
@ -35,8 +35,27 @@ SELECT * FROM upsert_test;
part_key | other_col | third_col
----------+-----------+-----------
1 | 2 | 4
(1 row)
2 | 2 |
(2 rows)
-- do a multi-row DO NOTHING insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2)
ON CONFLICT DO NOTHING;
-- do a multi-row DO UPDATE insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 10), (2, 20), (3, 30)
ON CONFLICT (part_key) DO
UPDATE SET other_col = EXCLUDED.other_col WHERE upsert_test.part_key != 1;
-- see the results
SELECT * FROM upsert_test ORDER BY part_key ASC;
part_key | other_col | third_col
----------+-----------+-----------
1 | 2 | 4
2 | 20 |
3 | 30 |
(3 rows)
DELETE FROM upsert_test WHERE part_key = 2;
DELETE FROM upsert_test WHERE part_key = 3;
-- use a WHERE clause, so that SET doesn't have an affect
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key)
DO UPDATE SET other_col = 30 WHERE upsert_test.other_col = 3;

View File

@ -22,6 +22,11 @@ step "s1-insert"
INSERT INTO test_concurrent_dml VALUES(1);
}
step "s1-multi-insert"
{
INSERT INTO test_concurrent_dml VALUES (1), (2);
}
step "s1-commit"
{
COMMIT;
@ -29,12 +34,42 @@ step "s1-commit"
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-update"
{
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
}
step "s2-multi-insert-overlap"
{
INSERT INTO test_concurrent_dml VALUES (1), (4);
}
step "s2-multi-insert"
{
INSERT INTO test_concurrent_dml VALUES (3), (4);
}
step "s2-commit"
{
COMMIT;
}
# verify that an in-progress insert blocks concurrent updates
permutation "s1-begin" "s1-insert" "s2-update" "s1-commit"
# but an insert without xact will not block
permutation "s1-insert" "s2-update"
# verify that an in-progress multi-row insert blocks concurrent updates
permutation "s1-begin" "s1-multi-insert" "s2-update" "s1-commit"
# two multi-row inserts that hit same shards will block
permutation "s1-begin" "s1-multi-insert" "s2-multi-insert-overlap" "s1-commit"
# but concurrent multi-row inserts don't block unless shards overlap
permutation "s1-begin" "s2-begin" "s1-multi-insert" "s2-multi-insert" "s1-commit" "s2-commit"

View File

@ -87,7 +87,7 @@ EXPLAIN (COSTS FALSE)
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0);
-- Test update
EXPLAIN (COSTS FALSE)

View File

@ -1375,6 +1375,15 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
\.
ROLLBACK;
-- Similarly, multi-row INSERTs will take part in transactions and reuse connections...
BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
104,104
\.
INSERT INTO raw_events_first (user_id, value_1) VALUES (105, 105), (106, 106);
ROLLBACK;
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES

View File

@ -132,7 +132,7 @@ INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy
SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
-- commands with non-constant partition values are supported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58);
@ -146,8 +146,26 @@ DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000);
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
-- multi-row inserts are supported
INSERT INTO limit_orders VALUES (12037, 'GOOG', 5634, '2001-04-16 03:37:28', 'buy', 0.50),
(12038, 'GOOG', 5634, '2001-04-17 03:37:28', 'buy', 2.50),
(12039, 'GOOG', 5634, '2001-04-18 03:37:28', 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 12037 AND 12039;
-- even those with functions
INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50),
(22038, 'GOOG', 5634, now(), 'buy', 2.50),
(22039, 'GOOG', 5634, now(), 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039;
-- even those with functions in their partition columns
INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'buy', 0.50),
(random() * 10 + 80000, 'GOOG', 5634, now(), 'buy', 2.50),
(random() * 10 + 80090, 'GOOG', 5634, now(), 'buy', 1.50);
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000;
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported

View File

@ -32,13 +32,22 @@ INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
-- replace a researcher, reusing their id
-- replace a researcher, reusing their id in a multi-row INSERT
BEGIN;
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
INSERT INTO researchers VALUES (2, 1, 'John Backus');
INSERT INTO researchers VALUES (2, 1, 'John Backus'), (12, 1, 'Frances E. Allen');
COMMIT;
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
SELECT name FROM researchers WHERE lab_id = 1 AND id % 10 = 2;
-- and the other way around
BEGIN;
INSERT INTO researchers VALUES (14, 2, 'Alan Kay'), (15, 2, 'Barbara Liskov');
DELETE FROM researchers WHERE id = 14 AND lab_id = 2;
ROLLBACK;
-- should have rolled everything back
SELECT * FROM researchers WHERE id = 15 AND lab_id = 2;
-- abort a modification
BEGIN;
@ -172,6 +181,15 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
\.
ROLLBACK;
-- but it is allowed after a multi-row insert
BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
3,1,Duth Knonald
10,6,Lesport Lampie
\.
ROLLBACK;
-- after a COPY you can modify multiple shards, since they'll use different connections
BEGIN;
\copy researchers from stdin delimiter ','

View File

@ -76,8 +76,10 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000);
-- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable)
DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp;
-- commands with multiple rows are unsupported
INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT);
-- commands with multiple rows are supported
INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()),
(2038, 'GOOG', 5634, now(), 'buy', random()),
(2039, 'GOOG', 5634, now(), 'buy', random());
-- connect back to the other node
\c - - - :worker_1_port

View File

@ -245,6 +245,17 @@ EXECUTE prepared_double_parameter_insert(4, 40);
EXECUTE prepared_double_parameter_insert(5, 50);
EXECUTE prepared_double_parameter_insert(6, 60);
PREPARE prepared_multi_insert(int, int) AS
INSERT INTO prepare_table (key, value) VALUES ($1, $2), ($1 + 1, $2 + 10);
-- execute 6 times to trigger prepared statement usage
EXECUTE prepared_multi_insert( 7, 70);
EXECUTE prepared_multi_insert( 9, 90);
EXECUTE prepared_multi_insert(11, 110);
EXECUTE prepared_multi_insert(13, 130);
EXECUTE prepared_multi_insert(15, 150);
EXECUTE prepared_multi_insert(17, 170);
PREPARE prepared_non_partition_parameter_insert(int) AS
INSERT INTO prepare_table (key, value) VALUES (0, $1);
@ -259,6 +270,8 @@ EXECUTE prepared_non_partition_parameter_insert(60);
-- check inserted values
SELECT * FROM prepare_table ORDER BY key, value;
SELECT master_modify_multiple_shards('DELETE FROM prepare_table WHERE value >= 70');
-- check router executor select
PREPARE prepared_router_partition_column_select(int) AS
SELECT

View File

@ -16,7 +16,7 @@ SELECT master_create_distributed_table('upsert_test', 'part_key', 'hash');
SELECT master_create_worker_shards('upsert_test', '4', '2');
-- do a regular insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1);
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2);
-- observe that there is a conflict and the following query does nothing
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING;
@ -34,6 +34,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1)
-- see the results
SELECT * FROM upsert_test;
-- do a multi-row DO NOTHING insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2)
ON CONFLICT DO NOTHING;
-- do a multi-row DO UPDATE insert
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 10), (2, 20), (3, 30)
ON CONFLICT (part_key) DO
UPDATE SET other_col = EXCLUDED.other_col WHERE upsert_test.part_key != 1;
-- see the results
SELECT * FROM upsert_test ORDER BY part_key ASC;
DELETE FROM upsert_test WHERE part_key = 2;
DELETE FROM upsert_test WHERE part_key = 3;
-- use a WHERE clause, so that SET doesn't have an affect
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key)
DO UPDATE SET other_col = 30 WHERE upsert_test.other_col = 3;