First version of INSERT .. SELECT

pull/896/head
Onder Kalaci 2016-10-04 11:27:53 +03:00
parent 522fa9a299
commit 3bcd4ef83e
9 changed files with 694 additions and 36 deletions

View File

@ -152,14 +152,12 @@ static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHa
static void ErrorIfUnsupportedTableCombination(Query *queryTree);
static void ErrorIfUnsupportedUnionQuery(Query *unionQuery);
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
static bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList);
static Query * LateralQuery(Query *query);
static bool SupportedLateralQuery(Query *parentQuery, Query *lateralQuery);
static bool JoinOnPartitionColumn(Query *query);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static List * RelationIdList(Query *query);
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
@ -3318,7 +3316,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
* Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column.
*/
static bool
bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
{
bool isPartitionColumn = false;

View File

@ -150,8 +150,6 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment
static StringInfo NodeNameArrayString(List *workerNodeList);
static StringInfo NodePortArrayString(List *workerNodeList);
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
RangeTableFragment *fragment);
@ -3939,7 +3937,7 @@ DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId)
* CreateBasicTask creates a task, initializes fields that are common to each task,
* and returns the created task.
*/
static Task *
Task *
CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString)
{
Task *task = CitusMakeNode(Task);

View File

@ -59,6 +59,24 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning)
{
originalQuery = copyObject(parse);
/*
* We implement INSERT INTO .. SELECT by pushing down the SELECT to
* each shard. That requires that the SELECT is co-located with the
* target table. To compute that we use the router planner, by adding
* a "hidden" constraint that the partition column be equal to a
* certain value. standard_planner() distributes that constraint to
* all affected table's baserestrictinfos. The router planner then
* iterates over the target table's shards, for each we replace the
* "hidden" restriction, with one that PruneShardList() handles, and
* then generate a query for that individual shard. If any of the
* involved tables don't prune down to a single shard, or if the
* pruned shards aren't colocated, we error out.
*/
if (InsertSelectQuery(parse))
{
AddHiddenPartitionColumnParameter(parse);
}
}
/* create a restriction context and put it at the end if context list */

View File

@ -19,12 +19,14 @@
#include "access/xact.h"
#include "distributed/citus_clauses.h"
#include "catalog/pg_type.h"
#include "distributed/colocation_utils.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
@ -46,6 +48,7 @@
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "parser/parsetree.h"
#include "parser/parse_oper.h"
#include "storage/lock.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
@ -65,6 +68,12 @@ typedef struct WalkerState
/* planner functions forward declarations */
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *
restrictionContext);
static MultiPlan * CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *
restrictionContext);
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
@ -82,6 +91,9 @@ static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext,
List **placementList);
static Query * RouterSelectQuery(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId);
static List * TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
@ -91,26 +103,26 @@ static bool UpdateRelationNames(Node *node,
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
static bool InsertSelectQuery(Query *query);
static RelationRestrictionContext * copyRelationRestrictionContext(
RelationRestrictionContext *oldContext);
static Node * ReplaceHiddenParameter(Node *node, void *context);
static Var * MakeInt4Column();
static Const * MakeInt4Constant(Datum constantValue);
static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree);
static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query);
static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query);
static void AddHiddenParameterToFirstTableRecursively(Query *query);
/*
* MultiRouterPlanCreate creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single shard. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then the function
* returns NULL.
* MultiRouterPlanCreate
*/
MultiPlan *
MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext)
{
Task *task = NULL;
Job *job = NULL;
MultiPlan *multiPlan = NULL;
CmdType commandType = query->commandType;
bool modifyTask = false;
List *placementList = NIL;
bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType,
restrictionContext);
@ -119,6 +131,36 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
return NULL;
}
if (InsertSelectQuery(query))
{
multiPlan = CreateMultiTaskRouterPlan(originalQuery, query, restrictionContext);
}
else
{
multiPlan = CreateSingleTaskRouterPlan(originalQuery, query, restrictionContext);
}
return multiPlan;
}
/*
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns
* query results from a single shard. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then the function
* returns NULL.
*/
static MultiPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
CmdType commandType = query->commandType;
bool modifyTask = false;
Job *job = NULL;
Task *task = NULL;
List *placementList = NIL;
MultiPlan *multiPlan = NULL;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
@ -156,6 +198,396 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
}
/*
* Creates a router plan for INSERT ... SELECT queries which can consists of
* multiple tasks.
*
* The function never returns NULL, it errors out if cannot create the multi plan.
*/
static MultiPlan *
CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
int shardOffset = 0;
int shardCount = cacheEntry->shardIntervalArrayLength;
List *sqlTaskList = NIL;
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
Job *workerJob = NULL;
uint64 jobId = INVALID_JOB_ID;
MultiPlan *multiPlan = NULL;
/*
* Error semantics for INSERT ... SELECT queries are different than regular
* modify queries. Thus, handle separately.
*/
ErrorIfInsertSelectQueryNotSupported(originalQuery);
/*
* Plan select query for each shard in the target table. Do so by
* replacing the magic parameters added in multi_planner() with actual
* current shard's boundary values. Then perform the normal shard
* pruning.
*/
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
Query *subquery = ((RangeTblEntry *) list_nth(query->rtable, 1))->subquery;
Query *copiedOriginal = copyObject(originalQuery);
Query *originalSubquery = ((RangeTblEntry *) list_nth(copiedOriginal->rtable,
1))->subquery;
RelationRestrictionContext *copiedRestrictionContext =
copyRelationRestrictionContext(restrictionContext);
ShardInterval *shardInterval =
cacheEntry->sortedShardIntervalArray[shardOffset];
uint64 shardId = shardInterval->shardId;
StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL;
Query *routerQuery = NULL;
Task *sqlTask = NULL;
List *selectPlacementList = NIL;
uint64 selectAnchorShardId = INVALID_SHARD_ID;
List *insertShardPlacementList = NULL;
List *intersectedPlacementList = NULL;
/*
* Replace the magic value in all baserestrictinfos. Note that
* this has to be done on a copy, as the walker modifies in place.
*/
foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList)
{
RelationRestriction *restriction = lfirst(restrictionCell);
restriction->relOptInfo->baserestrictinfo = (List *)
ReplaceHiddenParameter(
(Node *) restriction->relOptInfo->baserestrictinfo,
shardInterval);
}
/*
* Use select planner to generate query for this specific
* shard. We don't use the generated query, just rely on the
* side-effect that all RTEs have been updated to point to the
* relevant nodes.
*/
routerQuery = RouterSelectQuery(originalSubquery, subquery,
copiedRestrictionContext, &selectPlacementList,
&selectAnchorShardId);
if (routerQuery == NULL)
{
elog(ERROR, "couldn't prune down sufficiently for insert pushdown");
}
/* Ensure that we have INSERTed table's placement exists on the same worker */
insertShardPlacementList = ShardPlacementList(shardId);
intersectedPlacementList = IntersectPlacementList(insertShardPlacementList,
selectPlacementList);
if (list_length(insertShardPlacementList) != list_length(
intersectedPlacementList))
{
ereport(DEBUG2, (errmsg("insert table does not have the same placements on "
"the select placement list. Skipping this task")));
continue;
}
ReorderInsertSelectTargetListsIfExists(copiedOriginal);
/* and generate the full query string */
deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId,
queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
sqlTask = CreateBasicTask(jobId, taskIdIndex++, SQL_TASK, queryString->data);
sqlTask->dependedTaskList = NULL;
sqlTask->anchorShardId = shardId;
sqlTask->taskPlacementList = insertShardPlacementList;
sqlTaskList = lappend(sqlTaskList, sqlTask);
}
/* Create the worker job */
workerJob = CitusMakeNode(Job);
workerJob->taskList = sqlTaskList;
workerJob->subqueryPushdown = false;
workerJob->dependedJobList = NIL;
workerJob->jobId = jobId;
workerJob->jobQuery = originalQuery;
workerJob->requiresMasterEvaluation = false; /* for now we do not support any function evaluation */
/* and finally the multi plan */
multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = workerJob;
multiPlan->masterTableName = NULL;
multiPlan->masterQuery = NULL;
return multiPlan;
}
/*
* ErrorIfInsertSelectQueryNotSupported errors out for unsupported
* INSERT ... SELECT queries.
*/
static void
ErrorIfInsertSelectQueryNotSupported(Query *queryTree)
{
RangeTblEntry *insertRte = NULL;
RangeTblEntry *subqueryRte = NULL;
Query *subquery = NULL;
Oid insertRelationId = InvalidOid;
/* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree));
insertRte = linitial(queryTree->rtable);
subqueryRte = lsecond(queryTree->rtable);
subquery = subqueryRte->subquery;
insertRelationId = insertRte->relid;
/* we support this feature only for colocated tables */
ErrorIfNotAllParticipatingTablesAreColocated(queryTree);
if (contain_mutable_functions((Node *) queryTree))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("non-IMMUTABLE functions are not allowed in INSERT ... "
"SELECT queries")));
}
if (subquery->limitCount != NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("LIMIT clause are not allowed in INSERT ... SELECT "
"queries")));
}
if (subquery->limitOffset != NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("OFFSET clause are not allowed in INSERT ... SELECT "
"queries")));
}
/*TODO: check with Andres. Should we allow on partition column? I'm cool with not having window functions */
if (subquery->windowClause != NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Window functions are not allowed in INSERT ... SELECT "
"queries")));
}
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree);
}
/*
* ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables
* referenced in the query are not colocated.
*/
static void
ErrorIfNotAllParticipatingTablesAreColocated(Query *query)
{
List *relationIdList = RelationIdList(query);
ListCell *relationIdCell = NULL;
uint64 colocationId = INVALID_COLOCATION_ID;
bool tablesAreColocated = true;
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
uint64 currentColocationId = TableColocationId(relationId);
if (currentColocationId == INVALID_COLOCATION_ID)
{
tablesAreColocated = false;
break;
}
/* set for the first table */
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId = currentColocationId;
}
if (colocationId != currentColocationId)
{
tablesAreColocated = false;
break;
}
}
if (!tablesAreColocated)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("all participating tables should be colocated")));
}
return;
}
/*
* ErrorIfInsertPartitionColumnDoesNotMatchSelect checks whether the INSERTed table's
* partition column value matches with the any of the SELECTed table's partition column.
*/
static void
ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query)
{
ListCell *targetEntryCell = NULL;
uint32 rangeTableId = 1;
RangeTblEntry *insertRte = linitial(query->rtable);
RangeTblEntry *subqueryRte = lsecond(query->rtable);
Query *subquery = subqueryRte->subquery;
Oid insertRelationId = insertRte->relid;
Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId);
bool partitionColumnsMatch = false;
foreach(targetEntryCell, query->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
if (IsA(targetEntry->expr, Var))
{
Var *insertVar = (Var *) targetEntry->expr;
AttrNumber originalAttrNo = get_attnum(insertRelationId,
targetEntry->resname);
TargetEntry *subqeryTargetEntry = NULL;
if (originalAttrNo != insertPartitionColumn->varattno)
{
continue;
}
subqeryTargetEntry = list_nth(subquery->targetList,
insertVar->varattno - 1);
if (!IsA(subqeryTargetEntry->expr, Var))
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery))
{
partitionColumnsMatch = false;
break;
}
partitionColumnsMatch = true;
break;
}
}
if (!partitionColumnsMatch)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SELECT query should return bare partition column on "
"the same ordinal position with INSERT query's partition "
"column")));
}
}
/*
* AddHiddenPartitionColumnParameter() can only be used with
* INSERT ... SELECT queries. We add this hidden parameter to
* recursively for subqueries.
*
* If the input query is not INSERT .. SELECT the function errors-out.
*/
void
AddHiddenPartitionColumnParameter(Query *originalQuery)
{
Query *subquery = NULL;
RangeTblEntry *subqueryEntry = NULL;
if (!InsertSelectQuery(originalQuery))
{
elog(ERROR, "Only INSERT .. SELECT queries can be modified");
}
subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1);
subquery = subqueryEntry->subquery;
AddHiddenParameterToFirstTableRecursively(subquery);
}
/*
* AddHiddenParameterToFirstTableRecursively adds a hidden parameter
* ($1 = partitionColumn) for the first table on the query.
*/
static void
AddHiddenParameterToFirstTableRecursively(Query *query)
{
Param *hiddenParam = makeNode(Param);
Node *hiddenBound = NULL;
Oid firstTableId = ExtractFirstDistributedTableId(query);
Var *partitionColumn = PartitionColumn(firstTableId, 1);
Oid partitionColumnCollid = partitionColumn->varcollid;
Oid lessThanOperator = InvalidOid;
Oid equalsOperator = InvalidOid;
Oid greaterOperator = InvalidOid;
bool hashable = false;
List *subqueryEntryList = NIL;
ListCell *rangeTableEntryCell = NULL;
AssertArg(query->commandType == CMD_SELECT);
hiddenParam->paramkind = PARAM_EXTERN;
hiddenParam->paramid = HIDDEN_PARAMETER_ID;
hiddenParam->paramtype = partitionColumn->vartype;
hiddenParam->paramtypmod = partitionColumn->vartypmod;
hiddenParam->paramcollid = partitionColumnCollid;
hiddenParam->location = -1;
get_sort_group_operators(partitionColumn->vartype, true, true, true,
&lessThanOperator, &equalsOperator, &greaterOperator,
&hashable);
/*
* XXX: Using an equality constraint here isn't exactly correct,
* might want to replace it with >= and <=.
*
* It looks like this works.
*/
hiddenBound = (Node *)
make_opclause(equalsOperator, InvalidOid, false,
(Expr *) hiddenParam, (Expr *) partitionColumn,
partitionColumnCollid, partitionColumnCollid);
/* add restriction on partition column */
if (query->jointree->quals == NULL)
{
query->jointree->quals = hiddenBound;
}
else
{
query->jointree->quals = make_and_qual(query->jointree->quals,
hiddenBound);
}
/* recursively do same addition for subqueries of this query */
subqueryEntryList = SubqueryEntryList(query);
foreach(rangeTableEntryCell, subqueryEntryList)
{
RangeTblEntry *rangeTableEntry =
(RangeTblEntry *) lfirst(rangeTableEntryCell);
Query *innerSubquery = rangeTableEntry->subquery;
AddHiddenParameterToFirstTableRecursively(innerSubquery);
}
}
/*
* ErrorIfModifyQueryNotSupported checks if the query contains unsupported features,
* and errors out if it does.
@ -1039,11 +1471,44 @@ RouterSelectTask(Query *originalQuery, Query *query,
List **placementList)
{
Task *task = NULL;
StringInfo queryString = makeStringInfo();
bool upsertQuery = false;
uint64 shardId = INVALID_SHARD_ID;
originalQuery = RouterSelectQuery(originalQuery, query, restrictionContext,
placementList, &shardId);
if (originalQuery == NULL)
{
return NULL;
}
pg_get_query_def(originalQuery, queryString);
task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->taskType = ROUTER_TASK;
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
//task->requiresMasterEvaluation = false;
return task;
}
/* RouterSelectQuery builds a Task to represent a single shard select query */
static Query *
RouterSelectQuery(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId)
{
List *prunedRelationShardList = TargetShardIntervalsForSelect(query,
restrictionContext);
StringInfo queryString = makeStringInfo();
uint64 shardId = INVALID_SHARD_ID;
bool upsertQuery = false;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
ListCell *prunedRelationShardListCell = NULL;
List *workerList = NIL;
@ -1115,20 +1580,10 @@ RouterSelectTask(Query *originalQuery, Query *query,
UpdateRelationNames((Node *) originalQuery, restrictionContext);
pg_get_query_def(originalQuery, queryString);
task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->taskType = ROUTER_TASK;
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
*placementList = workerList;
*anchorShardId = shardId;
return task;
return originalQuery;
}
@ -1167,6 +1622,11 @@ TargetShardIntervalsForSelect(Query *query,
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
bool whereFalseQuery = false;
/* elog(DEBUG2, "relation id: %d", relationId); */
/* elog(DEBUG2, "restrictClauseList-: %s", pretty_format_node_dump(nodeToString(restrictClauseList))); */
/* elog(DEBUG2, "join info-: %s", pretty_format_node_dump(nodeToString(relationRestriction->relOptInfo->joininfo))); */
relationRestriction->prunedShardIntervalList = NIL;
/*
@ -1718,7 +2178,7 @@ ReorderInsertSelectTargetListsIfExists(Query *originalQuery)
* InsertSelectQuery returns true when the input query
* is INSERT INTO ... SELECT kind of query.
*/
static bool
bool
InsertSelectQuery(Query *query)
{
CmdType commandType = query->commandType;
@ -1746,3 +2206,178 @@ InsertSelectQuery(Query *query)
return true;
}
/*
* Copy a RelationRestrictionContext. Note that several subfields are copied
* shallowly, for lack of copyObject support.
*/
static RelationRestrictionContext *
copyRelationRestrictionContext(RelationRestrictionContext *oldContext)
{
RelationRestrictionContext *newContext = (RelationRestrictionContext *)
palloc(sizeof(RelationRestrictionContext));
ListCell *relationRestrictionCell = NULL;
newContext->hasDistributedRelation = oldContext->hasDistributedRelation;
newContext->hasLocalRelation = oldContext->hasLocalRelation;
newContext->relationRestrictionList = NIL;
foreach(relationRestrictionCell, oldContext->relationRestrictionList)
{
RelationRestriction *oldRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
RelationRestriction *newRestriction = (RelationRestriction *)
palloc0(sizeof(RelationRestriction));
newRestriction->index = oldRestriction->index;
newRestriction->relationId = oldRestriction->relationId;
newRestriction->distributedRelation = oldRestriction->distributedRelation;
newRestriction->rte = copyObject(oldRestriction->rte);
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */
newRestriction->relOptInfo = palloc(sizeof(RelOptInfo));
memcpy(newRestriction->relOptInfo, oldRestriction->relOptInfo,
sizeof(RelOptInfo));
newRestriction->relOptInfo->baserestrictinfo = copyObject(
oldRestriction->relOptInfo->baserestrictinfo);
/* not copyable, but readonly */
newRestriction->plannerInfo = oldRestriction->plannerInfo;
newRestriction->prunedShardIntervalList = copyObject(
oldRestriction->prunedShardIntervalList);
newContext->relationRestrictionList =
lappend(newContext->relationRestrictionList, newRestriction);
}
return newContext;
}
/*
* Replace the "hidden" partition restriction clause with the current shard's
* (passed in context) boundary value.
*/
static Node *
ReplaceHiddenParameter(Node *node, void *context)
{
ShardInterval *shardInterval = (ShardInterval *) context;
Assert(shardInterval->minValueExists);
Assert(shardInterval->maxValueExists);
if (node == NULL)
{
return NULL;
}
if (IsA(node, OpExpr))
{
OpExpr *op = (OpExpr *) node;
if (list_length(op->args) == 2)
{
Node *leftop = get_leftop((Expr *) op);
Node *rightop = get_rightop((Expr *) op);
Param *param = NULL;
/*
* TODO: do we really need Var? Postgres replaces Var with Const in case we already have the same
* Var in the restrictInfo
* */
if (IsA(leftop, Param))/* && IsA(rightop, Var)) */
{
param = (Param *) leftop;
}
else if (IsA(rightop, Param)) //IsA(leftop, Var))/* &&) */
{
param = (Param *) rightop;
}
/*
* Found hidden op, replace with appropriate boundaries for the
* current shard interval.
*/
if (param && param->paramid == HIDDEN_PARAMETER_ID)
{
Var *hashedColumn = NULL;
OpExpr *hashedOperatorExpression = NULL;
hashedColumn = MakeInt4Column();
hashedOperatorExpression = (OpExpr *)
make_opclause(96,
InvalidOid,
false, /* no return set */
(Expr *) hashedColumn,
(Expr *) MakeInt4Constant(
shardInterval->maxValue),
InvalidOid, InvalidOid);
hashedOperatorExpression->opfuncid = get_opcode(
hashedOperatorExpression->opno);
hashedOperatorExpression->opresulttype = get_func_rettype(
hashedOperatorExpression->opfuncid);
return (Node *) hashedOperatorExpression;
}
}
}
if (IsA(node, Query))
{
/* FIXME: probably can remove support for this */
/* to support CTEs, subqueries, etc */
return (Node *) query_tree_mutator((Query *) node,
ReplaceHiddenParameter,
context,
QTW_EXAMINE_RTES);
}
else if (IsA(node, RestrictInfo))
{
RestrictInfo *restrictInfo = (RestrictInfo *) node;
restrictInfo->clause = (Expr *) ReplaceHiddenParameter(
(Node *) restrictInfo->clause, context);
return (Node *) restrictInfo;
}
return expression_tree_mutator(node, ReplaceHiddenParameter, context);
}
/*
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
*/
static Var *
MakeInt4Column()
{
Index tableId = 0;
AttrNumber columnAttributeNumber = RESERVED_HASHED_COLUMN_ID;
Oid columnType = INT4OID;
int32 columnTypeMod = -1;
Oid columnCollationOid = InvalidOid;
Index columnLevelSup = 0;
Var *int4Column = makeVar(tableId, columnAttributeNumber, columnType,
columnTypeMod, columnCollationOid, columnLevelSup);
return int4Column;
}
/*
* MakeInt4Constant creates a new constant of int4 type and assigns the given
* value as a constant value.
*/
static Const *
MakeInt4Constant(Datum constantValue)
{
Oid constantType = INT4OID;
int32 constantTypeMode = -1;
Oid constantCollationId = InvalidOid;
int constantLength = sizeof(int32);
bool constantIsNull = false;
bool constantByValue = true;
Const *int4Constant = makeConst(constantType, constantTypeMode, constantCollationId,
constantLength, constantValue, constantIsNull,
constantByValue);
return int4Constant;
}

View File

@ -122,6 +122,8 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
extern List * RelationIdList(Query *query);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -227,6 +227,8 @@ extern int TaskAssignmentPolicy;
/* Function declarations for building physical plans and constructing queries */
extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree);
extern StringInfo ShardFetchQueryString(uint64 shardId);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
/* Function declarations for shard pruning */
extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList,

View File

@ -21,6 +21,9 @@
#include "nodes/parsenodes.h"
/* reserved parameted id */
#define HIDDEN_PARAMETER_ID 0xdeadbeef
/* reserved alias name for UPSERTs */
#define UPSERT_ALIAS "citus_table_alias"
@ -28,7 +31,9 @@
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
extern void AddHiddenPartitionColumnParameter(Query *originalQuery);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery);
extern bool InsertSelectQuery(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -205,10 +205,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders;
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
-- INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;

View File

@ -150,8 +150,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
-- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders;
-- INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)