mirror of https://github.com/citusdata/citus.git
Merge pull request #1483 from citusdata/update_subquery_on_where_false
Add support for router UPDATEs and DELETEs with subqueries and joinspull/1542/head
commit
a650aaa631
|
@ -412,7 +412,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
DeferredErrorMessage *planningError = NULL;
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
/* need to perform shard pruning, rebuild the task list from scratch */
|
/* need to perform shard pruning, rebuild the task list from scratch */
|
||||||
taskList = RouterModifyTaskList(jobQuery, &planningError);
|
taskList = RouterInsertTaskList(jobQuery, &planningError);
|
||||||
|
|
||||||
if (planningError != NULL)
|
if (planningError != NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -150,7 +150,9 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
if (modifyQuery->commandType != CMD_UTILITY)
|
if (modifyQuery->commandType != CMD_UTILITY)
|
||||||
{
|
{
|
||||||
DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery);
|
bool multiShardQuery = true;
|
||||||
|
DeferredErrorMessage *error = ModifyQuerySupported(modifyQuery, multiShardQuery);
|
||||||
|
|
||||||
if (error)
|
if (error)
|
||||||
{
|
{
|
||||||
RaiseDeferredError(error, ERROR);
|
RaiseDeferredError(error, ERROR);
|
||||||
|
|
|
@ -90,11 +90,25 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deparse_shard_query(query, relationId, task->anchorShardId,
|
/*
|
||||||
newQueryString);
|
* For INSERT queries, we only have one relation to update, so we can
|
||||||
|
* use deparse_shard_query(). For UPDATE and DELETE queries, we may have
|
||||||
|
* subqueries and joins, so we use relation shard list to update shard
|
||||||
|
* names and call pg_get_query_def() directly.
|
||||||
|
*/
|
||||||
|
if (query->commandType == CMD_INSERT)
|
||||||
|
{
|
||||||
|
deparse_shard_query(query, relationId, task->anchorShardId, newQueryString);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
List *relationShardList = task->relationShardList;
|
||||||
|
UpdateRelationToShardNames((Node *) query, relationShardList);
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
pg_get_query_def(query, newQueryString);
|
||||||
newQueryString->data)));
|
}
|
||||||
|
|
||||||
|
ereport(DEBUG4, (errmsg("distributed statement: %s", newQueryString->data)));
|
||||||
|
|
||||||
task->queryString = newQueryString->data;
|
task->queryString = newQueryString->data;
|
||||||
}
|
}
|
||||||
|
|
|
@ -437,12 +437,12 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
List *insertShardPlacementList = NULL;
|
List *insertShardPlacementList = NULL;
|
||||||
List *intersectedPlacementList = NULL;
|
List *intersectedPlacementList = NULL;
|
||||||
bool routerPlannable = false;
|
|
||||||
bool upsertQuery = false;
|
bool upsertQuery = false;
|
||||||
bool replacePrunedQueryWithDummy = false;
|
bool replacePrunedQueryWithDummy = false;
|
||||||
bool allReferenceTables = restrictionContext->allReferenceTables;
|
bool allReferenceTables = restrictionContext->allReferenceTables;
|
||||||
List *shardOpExpressions = NIL;
|
List *shardOpExpressions = NIL;
|
||||||
RestrictInfo *shardRestrictionList = NULL;
|
RestrictInfo *shardRestrictionList = NULL;
|
||||||
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
/* grab shared metadata lock to stop concurrent placement additions */
|
/* grab shared metadata lock to stop concurrent placement additions */
|
||||||
LockShardDistributionMetadata(shardId, ShareLock);
|
LockShardDistributionMetadata(shardId, ShareLock);
|
||||||
|
@ -489,15 +489,15 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
replacePrunedQueryWithDummy = false;
|
replacePrunedQueryWithDummy = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Use router select planner to decide on whether we can push down the query
|
* Use router planner to decide on whether we can push down the query or not.
|
||||||
* or not. If we can, we also rely on the side-effects that all RTEs have been
|
* If we can, we also rely on the side-effects that all RTEs have been updated
|
||||||
* updated to point to the relevant nodes and selectPlacementList is determined.
|
* to point to the relevant nodes and selectPlacementList is determined.
|
||||||
*/
|
*/
|
||||||
routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext,
|
planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext,
|
||||||
&selectPlacementList, &selectAnchorShardId,
|
&selectPlacementList, &selectAnchorShardId,
|
||||||
&relationShardList, replacePrunedQueryWithDummy);
|
&relationShardList, replacePrunedQueryWithDummy);
|
||||||
|
|
||||||
if (!routerPlannable)
|
if (planningError)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot perform distributed planning for the given "
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
|
|
@ -2291,12 +2291,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
|
||||||
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
||||||
List *relationShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
bool routerPlannable = false;
|
|
||||||
bool replacePrunedQueryWithDummy = false;
|
bool replacePrunedQueryWithDummy = false;
|
||||||
RelationRestrictionContext *copiedRestrictionContext =
|
RelationRestrictionContext *copiedRestrictionContext =
|
||||||
CopyRelationRestrictionContext(restrictionContext);
|
CopyRelationRestrictionContext(restrictionContext);
|
||||||
List *shardOpExpressions = NIL;
|
List *shardOpExpressions = NIL;
|
||||||
RestrictInfo *shardRestrictionList = NULL;
|
RestrictInfo *shardRestrictionList = NULL;
|
||||||
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
/* such queries should go through router planner */
|
/* such queries should go through router planner */
|
||||||
Assert(!restrictionContext->allReferenceTables);
|
Assert(!restrictionContext->allReferenceTables);
|
||||||
|
@ -2330,12 +2330,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
|
||||||
* or not. If we can, we also rely on the side-effects that all RTEs have been
|
* or not. If we can, we also rely on the side-effects that all RTEs have been
|
||||||
* updated to point to the relevant nodes and selectPlacementList is determined.
|
* updated to point to the relevant nodes and selectPlacementList is determined.
|
||||||
*/
|
*/
|
||||||
routerPlannable = RouterSelectQuery(taskQuery, copiedRestrictionContext,
|
planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext,
|
||||||
&selectPlacementList, &selectAnchorShardId,
|
&selectPlacementList, &selectAnchorShardId,
|
||||||
&relationShardList, replacePrunedQueryWithDummy);
|
&relationShardList, replacePrunedQueryWithDummy);
|
||||||
|
|
||||||
/* we don't expect to this this error but keeping it as a precaution for future changes */
|
/* we don't expect to this this error but keeping it as a precaution for future changes */
|
||||||
if (!routerPlannable)
|
if (planningError)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot perform distributed planning for the given "
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
|
|
@ -93,33 +93,30 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
||||||
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
|
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
|
||||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||||
FromExpr *joinTree);
|
FromExpr *joinTree);
|
||||||
static Job * RouterModifyJob(Query *originalQuery, Query *query,
|
static Job * RouterInsertJob(Query *originalQuery, Query *query,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry);
|
static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry);
|
||||||
static bool CanShardPrune(Oid distributedTableId, Query *query);
|
static bool CanShardPrune(Oid distributedTableId, Query *query);
|
||||||
static List * RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
|
||||||
DeferredErrorMessage **planningError);
|
|
||||||
static List * RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
|
||||||
DeferredErrorMessage **planningError);
|
|
||||||
static Job * CreateJob(Query *query);
|
static Job * CreateJob(Query *query);
|
||||||
static Task * CreateTask(TaskType taskType);
|
static Task * CreateTask(TaskType taskType);
|
||||||
static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
|
static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
static ShardInterval * FindShardForUpdateOrDelete(Query *query,
|
|
||||||
DistTableCacheEntry *cacheEntry,
|
|
||||||
DeferredErrorMessage **planningError);
|
|
||||||
static List * QueryRestrictList(Query *query, char partitionMethod);
|
|
||||||
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||||
static Job * RouterSelectJob(Query *originalQuery,
|
static Job * RouterJob(Query *originalQuery,
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *restrictionContext,
|
||||||
bool *queryRoutable);
|
DeferredErrorMessage **planningError);
|
||||||
static bool RelationPrunesToMultipleShards(List *relationShardList);
|
static bool RelationPrunesToMultipleShards(List *relationShardList);
|
||||||
static List * TargetShardIntervalsForSelect(Query *query,
|
static List * TargetShardIntervalsForRouter(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext);
|
RelationRestrictionContext *restrictionContext,
|
||||||
|
bool *multiShardQuery);
|
||||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||||
static bool MultiRouterPlannableQuery(Query *query,
|
static bool MultiRouterPlannableQuery(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext);
|
RelationRestrictionContext *restrictionContext);
|
||||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||||
|
static bool UpdateOrDeleteQuery(Query *query);
|
||||||
|
static RangeTblEntry * GetUpdateOrDeleteRTE(List *rangeTableList);
|
||||||
|
static bool UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry);
|
||||||
|
static bool SelectsFromDistributedTable(List *rangeTableList);
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
static List * get_all_actual_clauses(List *restrictinfo_list);
|
static List * get_all_actual_clauses(List *restrictinfo_list);
|
||||||
#endif
|
#endif
|
||||||
|
@ -161,16 +158,28 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
||||||
{
|
{
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
||||||
|
bool multiShardQuery = false;
|
||||||
|
|
||||||
multiPlan->operation = query->commandType;
|
multiPlan->operation = query->commandType;
|
||||||
|
|
||||||
multiPlan->planningError = ModifyQuerySupported(query);
|
multiPlan->planningError = ModifyQuerySupported(query, multiShardQuery);
|
||||||
if (multiPlan->planningError != NULL)
|
if (multiPlan->planningError != NULL)
|
||||||
{
|
{
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
job = RouterModifyJob(originalQuery, query, &multiPlan->planningError);
|
if (UpdateOrDeleteQuery(query))
|
||||||
|
{
|
||||||
|
RelationRestrictionContext *restrictionContext =
|
||||||
|
plannerRestrictionContext->relationRestrictionContext;
|
||||||
|
|
||||||
|
job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
job = RouterInsertJob(originalQuery, query, &multiPlan->planningError);
|
||||||
|
}
|
||||||
|
|
||||||
if (multiPlan->planningError != NULL)
|
if (multiPlan->planningError != NULL)
|
||||||
{
|
{
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
|
@ -204,7 +213,6 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
RelationRestrictionContext *restrictionContext)
|
RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
bool queryRoutable = false;
|
|
||||||
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
||||||
|
|
||||||
multiPlan->operation = query->commandType;
|
multiPlan->operation = query->commandType;
|
||||||
|
@ -216,8 +224,8 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
job = RouterSelectJob(originalQuery, restrictionContext, &queryRoutable);
|
job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError);
|
||||||
if (!queryRoutable)
|
if (multiPlan->planningError)
|
||||||
{
|
{
|
||||||
/* query cannot be handled by this planner */
|
/* query cannot be handled by this planner */
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -455,7 +463,7 @@ ExtractInsertRangeTableEntry(Query *query)
|
||||||
* features, otherwise it returns an error description.
|
* features, otherwise it returns an error description.
|
||||||
*/
|
*/
|
||||||
DeferredErrorMessage *
|
DeferredErrorMessage *
|
||||||
ModifyQuerySupported(Query *queryTree)
|
ModifyQuerySupported(Query *queryTree, bool multiShardQuery)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
|
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
|
@ -479,9 +487,18 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
*/
|
*/
|
||||||
if (queryTree->hasSubLinks == true)
|
if (queryTree->hasSubLinks == true)
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
/*
|
||||||
"subqueries are not supported in distributed modifications",
|
* We support UPDATE and DELETE with subqueries unless they are multi
|
||||||
NULL, NULL);
|
* shard queries.
|
||||||
|
*/
|
||||||
|
if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot perform distributed planning for the given "
|
||||||
|
"modifications",
|
||||||
|
"Subqueries are not supported in distributed "
|
||||||
|
"modifications.", NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reject queries which include CommonTableExpr */
|
/* reject queries which include CommonTableExpr */
|
||||||
|
@ -542,6 +559,17 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
char *rangeTableEntryErrorDetail = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We support UPDATE and DELETE with subqueries and joins unless
|
||||||
|
* they are multi shard queries.
|
||||||
|
*/
|
||||||
|
if (UpdateOrDeleteQuery(queryTree) && !multiShardQuery)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Error out for rangeTableEntries that we do not support.
|
* Error out for rangeTableEntries that we do not support.
|
||||||
* We do not explicitly specify "in FROM clause" in the error detail
|
* We do not explicitly specify "in FROM clause" in the error detail
|
||||||
|
@ -549,7 +577,6 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
* We do not need to check for RTE_CTE because all common table expressions
|
* We do not need to check for RTE_CTE because all common table expressions
|
||||||
* are rejected above with queryTree->cteList check.
|
* are rejected above with queryTree->cteList check.
|
||||||
*/
|
*/
|
||||||
char *rangeTableEntryErrorDetail = NULL;
|
|
||||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
|
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
|
||||||
{
|
{
|
||||||
rangeTableEntryErrorDetail = "Subqueries are not supported in"
|
rangeTableEntryErrorDetail = "Subqueries are not supported in"
|
||||||
|
@ -585,12 +612,18 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
*/
|
*/
|
||||||
if (commandType != CMD_INSERT && queryTableCount != 1)
|
if (commandType != CMD_INSERT && queryTableCount != 1)
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
/*
|
||||||
"cannot perform distributed planning for the given"
|
* We support UPDATE and DELETE with joins unless they are multi shard
|
||||||
" modification",
|
* queries.
|
||||||
"Joins are not supported in distributed "
|
*/
|
||||||
"modifications.",
|
if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery)
|
||||||
NULL);
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot perform distributed planning for the given "
|
||||||
|
"modification",
|
||||||
|
"Joins are not supported in distributed "
|
||||||
|
"modifications.", NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reject queries which involve multi-row inserts */
|
/* reject queries which involve multi-row inserts */
|
||||||
|
@ -794,6 +827,24 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command.
|
||||||
|
* If it is, it returns true otherwise it returns false.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
UpdateOrDeleteQuery(Query *query)
|
||||||
|
{
|
||||||
|
CmdType commandType = query->commandType;
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the expression contains STABLE functions which accept any parameters derived from a
|
* If the expression contains STABLE functions which accept any parameters derived from a
|
||||||
* Var returns true and sets varArgument.
|
* Var returns true and sets varArgument.
|
||||||
|
@ -1013,12 +1064,12 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterModifyJob builds a Job to represent a modification performed by
|
* RouterInsertJob builds a Job to represent an insertion performed by
|
||||||
* the provided query against the provided shard interval. This task contains
|
* the provided query against the provided shard interval. This task contains
|
||||||
* shard-extended deparsed SQL to be run during execution.
|
* shard-extended deparsed SQL to be run during execution.
|
||||||
*/
|
*/
|
||||||
static Job *
|
static Job *
|
||||||
RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
|
@ -1037,7 +1088,7 @@ RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
taskList = RouterModifyTaskList(query, planningError);
|
taskList = RouterInsertTaskList(query, planningError);
|
||||||
if (*planningError)
|
if (*planningError)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1118,36 +1169,6 @@ CanShardPrune(Oid distributedTableId, Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterModifyTaskList builds a list of tasks for a given query.
|
|
||||||
*/
|
|
||||||
List *
|
|
||||||
RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError)
|
|
||||||
{
|
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
|
||||||
CmdType commandType = query->commandType;
|
|
||||||
List *taskList = NIL;
|
|
||||||
|
|
||||||
ErrorIfNoShardsExist(cacheEntry);
|
|
||||||
|
|
||||||
if (commandType == CMD_INSERT)
|
|
||||||
{
|
|
||||||
taskList = RouterInsertTaskList(query, cacheEntry, planningError);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
taskList = RouterUpdateOrDeleteTaskList(query, cacheEntry, planningError);
|
|
||||||
if (*planningError)
|
|
||||||
{
|
|
||||||
return NIL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return taskList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfNoShardsExist throws an error if the given table has no shards.
|
* ErrorIfNoShardsExist throws an error if the given table has no shards.
|
||||||
*/
|
*/
|
||||||
|
@ -1174,13 +1195,17 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
|
||||||
* RouterInsertTaskList generates a list of tasks for performing an INSERT on
|
* RouterInsertTaskList generates a list of tasks for performing an INSERT on
|
||||||
* a distributed table via the router executor.
|
* a distributed table via the router executor.
|
||||||
*/
|
*/
|
||||||
static List *
|
List *
|
||||||
RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
|
||||||
DeferredErrorMessage **planningError)
|
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
Task *modifyTask = NULL;
|
Task *modifyTask = NULL;
|
||||||
|
|
||||||
|
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
|
ErrorIfNoShardsExist(cacheEntry);
|
||||||
|
|
||||||
Assert(query->commandType == CMD_INSERT);
|
Assert(query->commandType == CMD_INSERT);
|
||||||
|
|
||||||
shardInterval = FindShardForInsert(query, cacheEntry, planningError);
|
shardInterval = FindShardForInsert(query, cacheEntry, planningError);
|
||||||
|
@ -1206,41 +1231,6 @@ RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterUpdateOrDeleteTaskList returns a list of tasks for executing an UPDATE
|
|
||||||
* or DELETE command on a distributed table via the router executor.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
|
||||||
DeferredErrorMessage **planningError)
|
|
||||||
{
|
|
||||||
ShardInterval *shardInterval = NULL;
|
|
||||||
List *taskList = NIL;
|
|
||||||
|
|
||||||
Assert(query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE);
|
|
||||||
|
|
||||||
shardInterval = FindShardForUpdateOrDelete(query, cacheEntry, planningError);
|
|
||||||
|
|
||||||
if (*planningError != NULL)
|
|
||||||
{
|
|
||||||
return NIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shardInterval != NULL)
|
|
||||||
{
|
|
||||||
Task *modifyTask = NULL;
|
|
||||||
|
|
||||||
modifyTask = CreateTask(MODIFY_TASK);
|
|
||||||
modifyTask->anchorShardId = shardInterval->shardId;
|
|
||||||
modifyTask->replicationModel = cacheEntry->replicationModel;
|
|
||||||
|
|
||||||
taskList = lappend(taskList, modifyTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
return taskList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateTask returns a new Task with the given type.
|
* CreateTask returns a new Task with the given type.
|
||||||
*/
|
*/
|
||||||
|
@ -1400,109 +1390,6 @@ FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* FindShardForUpdateOrDelete finds the shard interval in which an UPDATE or
|
|
||||||
* DELETE command should be applied, or sets planningError when the query
|
|
||||||
* needs to be applied to multiple or no shards.
|
|
||||||
*/
|
|
||||||
static ShardInterval *
|
|
||||||
FindShardForUpdateOrDelete(Query *query, DistTableCacheEntry *cacheEntry,
|
|
||||||
DeferredErrorMessage **planningError)
|
|
||||||
{
|
|
||||||
Oid distributedTableId = cacheEntry->relationId;
|
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
CmdType commandType = query->commandType;
|
|
||||||
List *restrictClauseList = NIL;
|
|
||||||
Index tableId = 1;
|
|
||||||
List *prunedShardList = NIL;
|
|
||||||
int prunedShardCount = 0;
|
|
||||||
|
|
||||||
Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE);
|
|
||||||
|
|
||||||
restrictClauseList = QueryRestrictList(query, partitionMethod);
|
|
||||||
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
|
|
||||||
|
|
||||||
prunedShardCount = list_length(prunedShardList);
|
|
||||||
if (prunedShardCount > 1)
|
|
||||||
{
|
|
||||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
|
||||||
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
|
||||||
partitionKeyString);
|
|
||||||
StringInfo errorMessage = makeStringInfo();
|
|
||||||
StringInfo errorHint = makeStringInfo();
|
|
||||||
const char *commandName = NULL;
|
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE)
|
|
||||||
{
|
|
||||||
commandName = "UPDATE";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
commandName = "DELETE";
|
|
||||||
}
|
|
||||||
|
|
||||||
appendStringInfo(errorHint, "Consider using an equality filter on "
|
|
||||||
"partition column \"%s\" to target a "
|
|
||||||
"single shard. If you'd like to run a "
|
|
||||||
"multi-shard operation, use "
|
|
||||||
"master_modify_multiple_shards().",
|
|
||||||
partitionColumnName);
|
|
||||||
|
|
||||||
if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND)
|
|
||||||
{
|
|
||||||
appendStringInfo(errorHint, " You can also use "
|
|
||||||
"master_apply_delete_command() to drop "
|
|
||||||
"all shards satisfying delete criteria.");
|
|
||||||
}
|
|
||||||
|
|
||||||
appendStringInfo(errorMessage,
|
|
||||||
"cannot run %s command which targets multiple shards",
|
|
||||||
commandName);
|
|
||||||
|
|
||||||
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
errorMessage->data, NULL,
|
|
||||||
errorHint->data);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
else if (prunedShardCount == 0)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (ShardInterval *) linitial(prunedShardList);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* QueryRestrictList returns the restriction clauses for the query. For a SELECT
|
|
||||||
* statement these are the where-clause expressions. For INSERT statements we
|
|
||||||
* build an equality clause based on the partition-column and its supplied
|
|
||||||
* insert value.
|
|
||||||
*
|
|
||||||
* Since reference tables do not have partition columns, the function returns
|
|
||||||
* NIL for reference tables.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
QueryRestrictList(Query *query, char partitionMethod)
|
|
||||||
{
|
|
||||||
List *queryRestrictList = NIL;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Reference tables do not have the notion of partition column. Thus,
|
|
||||||
* there are no restrictions on the partition column.
|
|
||||||
*/
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
|
||||||
return queryRestrictList;
|
|
||||||
}
|
|
||||||
|
|
||||||
queryRestrictList = WhereClauseList(query->jointree);
|
|
||||||
|
|
||||||
return queryRestrictList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
|
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
|
||||||
* for the first distributed table in that query. If the function cannot find a
|
* for the first distributed table in that query. If the function cannot find a
|
||||||
|
@ -1554,84 +1441,253 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* RouterSelectJob builds a Job to represent a single shard select query */
|
/* RouterJob builds a Job to represent a single shard select/update/delete query */
|
||||||
static Job *
|
static Job *
|
||||||
RouterSelectJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
||||||
bool *returnQueryRoutable)
|
DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
bool queryRoutable = false;
|
|
||||||
StringInfo queryString = makeStringInfo();
|
StringInfo queryString = makeStringInfo();
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
List *placementList = NIL;
|
List *placementList = NIL;
|
||||||
List *relationShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
|
List *rangeTableList = NIL;
|
||||||
bool replacePrunedQueryWithDummy = false;
|
bool replacePrunedQueryWithDummy = false;
|
||||||
|
bool requiresMasterEvaluation = false;
|
||||||
|
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||||
|
|
||||||
/* router planner should create task even if it deosn't hit a shard at all */
|
/* router planner should create task even if it deosn't hit a shard at all */
|
||||||
replacePrunedQueryWithDummy = true;
|
replacePrunedQueryWithDummy = true;
|
||||||
|
|
||||||
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
|
/* check if this query requires master evaluation */
|
||||||
&placementList, &shardId, &relationShardList,
|
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||||
replacePrunedQueryWithDummy);
|
|
||||||
|
|
||||||
|
(*planningError) = PlanRouterQuery(originalQuery, restrictionContext,
|
||||||
if (!queryRoutable)
|
&placementList, &shardId, &relationShardList,
|
||||||
|
replacePrunedQueryWithDummy);
|
||||||
|
if (*planningError)
|
||||||
{
|
{
|
||||||
*returnQueryRoutable = false;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
job = CreateJob(originalQuery);
|
job = CreateJob(originalQuery);
|
||||||
|
|
||||||
|
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList);
|
||||||
|
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If all of the shards are pruned, we replace the relation RTE into
|
||||||
|
* subquery RTE that returns no results. However, this is not useful
|
||||||
|
* for UPDATE and DELETE queries. Therefore, if we detect a UPDATE or
|
||||||
|
* DELETE RTE with subquery type, we just set task list to empty and return
|
||||||
|
* the job.
|
||||||
|
*/
|
||||||
|
if (updateOrDeleteRTE != NULL && updateOrDeleteRTE->rtekind == RTE_SUBQUERY)
|
||||||
|
{
|
||||||
|
job->taskList = NIL;
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
pg_get_query_def(originalQuery, queryString);
|
pg_get_query_def(originalQuery, queryString);
|
||||||
|
|
||||||
task = CreateTask(ROUTER_TASK);
|
if (originalQuery->commandType == CMD_SELECT)
|
||||||
|
{
|
||||||
|
task = CreateTask(ROUTER_TASK);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *modificationTableCacheEntry = NULL;
|
||||||
|
char modificationPartitionMethod = 0;
|
||||||
|
|
||||||
|
modificationTableCacheEntry = DistributedTableCacheEntry(
|
||||||
|
updateOrDeleteRTE->relid);
|
||||||
|
modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
|
||||||
|
|
||||||
|
if (modificationPartitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
|
SelectsFromDistributedTable(rangeTableList))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform select on a distributed table "
|
||||||
|
"and modify a reference table")));
|
||||||
|
}
|
||||||
|
|
||||||
|
task = CreateTask(MODIFY_TASK);
|
||||||
|
task->replicationModel = modificationTableCacheEntry->replicationModel;
|
||||||
|
}
|
||||||
|
|
||||||
task->queryString = queryString->data;
|
task->queryString = queryString->data;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = placementList;
|
task->taskPlacementList = placementList;
|
||||||
task->relationShardList = relationShardList;
|
task->relationShardList = relationShardList;
|
||||||
|
|
||||||
job->taskList = list_make1(task);
|
job->taskList = list_make1(task);
|
||||||
|
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||||
*returnQueryRoutable = true;
|
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterSelectQuery returns true if the input query can be pushed down to the
|
* GetUpdateOrDeleteRTE walks over the given range table list, and checks if
|
||||||
* worker node as it is. Otherwise, the function returns false.
|
* it has an UPDATE or DELETE RTE. If it finds one, it return it immediately.
|
||||||
*
|
|
||||||
* On return true, all RTEs have been updated to point to the relevant shards in
|
|
||||||
* the originalQuery. Also, placementList is filled with the list of worker nodes
|
|
||||||
* that has all the required shard placements for the query execution.
|
|
||||||
* anchorShardId is set to the first pruned shardId of the given query. Finally,
|
|
||||||
* relationShardList is filled with the list of relation-to-shard mappings for
|
|
||||||
* the query.
|
|
||||||
*/
|
*/
|
||||||
bool
|
static RangeTblEntry *
|
||||||
RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
GetUpdateOrDeleteRTE(List *rangeTableList)
|
||||||
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
|
||||||
bool replacePrunedQueryWithDummy)
|
|
||||||
{
|
{
|
||||||
List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery,
|
ListCell *rangeTableCell = NULL;
|
||||||
restrictionContext);
|
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
|
||||||
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = originalQuery->commandType;
|
|
||||||
ListCell *prunedRelationShardListCell = NULL;
|
|
||||||
List *workerList = NIL;
|
|
||||||
bool shardsPresent = false;
|
|
||||||
|
|
||||||
*placementList = NIL;
|
foreach(rangeTableCell, rangeTableList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
|
||||||
if (prunedRelationShardList == NULL)
|
if (UpdateOrDeleteRTE(rangeTableEntry))
|
||||||
|
{
|
||||||
|
return rangeTableEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateOrDeleteRTE checks if the given range table entry is an UPDATE or
|
||||||
|
* DELETE RTE by checking required permissions on it.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
UpdateOrDeleteRTE(RangeTblEntry *rangeTableEntry)
|
||||||
|
{
|
||||||
|
if ((ACL_UPDATE & rangeTableEntry->requiredPerms) ||
|
||||||
|
(ACL_DELETE & rangeTableEntry->requiredPerms))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Assert(commandType == CMD_SELECT);
|
|
||||||
|
/*
|
||||||
|
* SelectsFromDistributedTable checks if there is a select on a distributed
|
||||||
|
* table by looking into range table entries.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
SelectsFromDistributedTable(List *rangeTableList)
|
||||||
|
{
|
||||||
|
ListCell *rangeTableCell = NULL;
|
||||||
|
foreach(rangeTableCell, rangeTableList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
|
|
||||||
|
if (rangeTableEntry->relid == InvalidOid)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheEntry = DistributedTableCacheEntry(rangeTableEntry->relid);
|
||||||
|
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||||
|
!UpdateOrDeleteRTE(rangeTableEntry))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries.
|
||||||
|
* If there are shards present and query is routable, all RTEs have been updated
|
||||||
|
* to point to the relevant shards in the originalQuery. Also, placementList is
|
||||||
|
* filled with the list of worker nodes that has all the required shard placements
|
||||||
|
* for the query execution. anchorShardId is set to the first pruned shardId of
|
||||||
|
* the given query. Finally, relationShardList is filled with the list of
|
||||||
|
* relation-to-shard mappings for the query.
|
||||||
|
*
|
||||||
|
* If the given query is not routable, it fills planningError with the related
|
||||||
|
* DeferredErrorMessage. The caller can check this error message to see if query
|
||||||
|
* is routable or not.
|
||||||
|
*/
|
||||||
|
DeferredErrorMessage *
|
||||||
|
PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
||||||
|
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
||||||
|
bool replacePrunedQueryWithDummy)
|
||||||
|
{
|
||||||
|
bool multiShardQuery = false;
|
||||||
|
List *prunedRelationShardList = NIL;
|
||||||
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
ListCell *prunedRelationShardListCell = NULL;
|
||||||
|
List *workerList = NIL;
|
||||||
|
bool shardsPresent = false;
|
||||||
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
|
|
||||||
|
*placementList = NIL;
|
||||||
|
prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery,
|
||||||
|
restrictionContext,
|
||||||
|
&multiShardQuery);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If multiShardQuery is true then it means a relation has more
|
||||||
|
* than one shard left after pruning.
|
||||||
|
*/
|
||||||
|
if (multiShardQuery)
|
||||||
|
{
|
||||||
|
StringInfo errorMessage = makeStringInfo();
|
||||||
|
StringInfo errorHint = makeStringInfo();
|
||||||
|
CmdType commandType = originalQuery->commandType;
|
||||||
|
const char *commandName = "SELECT";
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE)
|
||||||
|
{
|
||||||
|
commandName = "UPDATE";
|
||||||
|
}
|
||||||
|
else if (commandType == CMD_DELETE)
|
||||||
|
{
|
||||||
|
commandName = "DELETE";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
||||||
|
{
|
||||||
|
List *rangeTableList = NIL;
|
||||||
|
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||||
|
DistTableCacheEntry *updateOrDeleteTableCacheEntry = NULL;
|
||||||
|
char *partitionKeyString = NULL;
|
||||||
|
char *partitionColumnName = NULL;
|
||||||
|
|
||||||
|
/* extract range table entries */
|
||||||
|
ExtractRangeTableEntryWalker((Node *) originalQuery, &rangeTableList);
|
||||||
|
|
||||||
|
updateOrDeleteRTE = GetUpdateOrDeleteRTE(rangeTableList);
|
||||||
|
updateOrDeleteTableCacheEntry =
|
||||||
|
DistributedTableCacheEntry(updateOrDeleteRTE->relid);
|
||||||
|
|
||||||
|
partitionKeyString = updateOrDeleteTableCacheEntry->partitionKeyString;
|
||||||
|
partitionColumnName = ColumnNameToColumn(updateOrDeleteRTE->relid,
|
||||||
|
partitionKeyString);
|
||||||
|
|
||||||
|
appendStringInfo(errorHint, "Consider using an equality filter on "
|
||||||
|
"partition column \"%s\" to target a "
|
||||||
|
"single shard. If you'd like to run a "
|
||||||
|
"multi-shard operation, use "
|
||||||
|
"master_modify_multiple_shards().",
|
||||||
|
partitionColumnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* note that for SELECT queries, we never print this error message */
|
||||||
|
appendStringInfo(errorMessage,
|
||||||
|
"cannot run %s command which targets multiple shards",
|
||||||
|
commandName);
|
||||||
|
|
||||||
|
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
errorMessage->data, NULL,
|
||||||
|
errorHint->data);
|
||||||
|
return planningError;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
||||||
{
|
{
|
||||||
|
@ -1673,7 +1729,10 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
*/
|
*/
|
||||||
if (RelationPrunesToMultipleShards(*relationShardList))
|
if (RelationPrunesToMultipleShards(*relationShardList))
|
||||||
{
|
{
|
||||||
return false;
|
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot run command which targets "
|
||||||
|
"multiple shards", NULL, NULL);
|
||||||
|
return planningError;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1708,44 +1767,53 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
* shard intervals. Thus, we should return empty list if there aren't any matching
|
* shard intervals. Thus, we should return empty list if there aren't any matching
|
||||||
* workers, so that the caller can decide what to do with this task.
|
* workers, so that the caller can decide what to do with this task.
|
||||||
*/
|
*/
|
||||||
workerList = NIL;
|
return NULL;
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (workerList == NIL)
|
if (workerList == NIL)
|
||||||
{
|
{
|
||||||
ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
|
ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
|
||||||
|
|
||||||
return false;
|
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"found no worker with all shard placements",
|
||||||
|
NULL, NULL);
|
||||||
|
return planningError;
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
/*
|
||||||
|
* If this is an UPDATE or DELETE query which requires master evaluation,
|
||||||
|
* don't try update shard names, and postpone that to execution phase.
|
||||||
|
*/
|
||||||
|
if (!(UpdateOrDeleteQuery(originalQuery) && RequiresMasterEvaluation(originalQuery)))
|
||||||
|
{
|
||||||
|
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
||||||
|
}
|
||||||
|
|
||||||
*placementList = workerList;
|
*placementList = workerList;
|
||||||
*anchorShardId = shardId;
|
*anchorShardId = shardId;
|
||||||
|
|
||||||
return true;
|
return planningError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TargetShardIntervalsForSelect performs shard pruning for all referenced relations
|
* TargetShardIntervalsForRouter performs shard pruning for all referenced relations
|
||||||
* in the query and returns list of shards per relation. Shard pruning is done based
|
* in the query and returns list of shards per relation. Shard pruning is done based
|
||||||
* on provided restriction context per relation. The function bails out and returns NULL
|
* on provided restriction context per relation. The function bails out and returns
|
||||||
* if any of the relations pruned down to more than one active shard. It also records
|
* after setting multiShardQuery to true if any of the relations pruned down to
|
||||||
* pruned shard intervals in relation restriction context to be used later on. Some
|
* more than one active shard. It also records pruned shard intervals in relation
|
||||||
* queries may have contradiction clauses like 'and false' or 'and 1=0', such queries
|
* restriction context to be used later on. Some queries may have contradiction
|
||||||
* are treated as if all of the shards of joining relations are pruned out.
|
* clauses like 'and false' or 'and 1=0', such queries are treated as if all of
|
||||||
|
* the shards of joining relations are pruned out.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
TargetShardIntervalsForSelect(Query *query,
|
TargetShardIntervalsForRouter(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext)
|
RelationRestrictionContext *restrictionContext,
|
||||||
|
bool *multiShardQuery)
|
||||||
{
|
{
|
||||||
List *prunedRelationShardList = NIL;
|
List *prunedRelationShardList = NIL;
|
||||||
ListCell *restrictionCell = NULL;
|
ListCell *restrictionCell = NULL;
|
||||||
|
|
||||||
Assert(query->commandType == CMD_SELECT);
|
|
||||||
Assert(restrictionContext != NULL);
|
Assert(restrictionContext != NULL);
|
||||||
|
|
||||||
foreach(restrictionCell, restrictionContext->relationRestrictionList)
|
foreach(restrictionCell, restrictionContext->relationRestrictionList)
|
||||||
|
@ -1774,8 +1842,7 @@ TargetShardIntervalsForSelect(Query *query,
|
||||||
whereFalseQuery = ContainsFalseClause(pseudoRestrictionList);
|
whereFalseQuery = ContainsFalseClause(pseudoRestrictionList);
|
||||||
if (!whereFalseQuery && shardCount > 0)
|
if (!whereFalseQuery && shardCount > 0)
|
||||||
{
|
{
|
||||||
prunedShardList = PruneShards(relationId, tableId,
|
prunedShardList = PruneShards(relationId, tableId, restrictClauseList);
|
||||||
restrictClauseList);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Quick bail out. The query can not be router plannable if one
|
* Quick bail out. The query can not be router plannable if one
|
||||||
|
@ -1785,7 +1852,9 @@ TargetShardIntervalsForSelect(Query *query,
|
||||||
*/
|
*/
|
||||||
if (list_length(prunedShardList) > 1)
|
if (list_length(prunedShardList) > 1)
|
||||||
{
|
{
|
||||||
return NULL;
|
(*multiShardQuery) = true;
|
||||||
|
|
||||||
|
return NIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1992,8 +2061,8 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC
|
||||||
RelationRestrictionContext *
|
RelationRestrictionContext *
|
||||||
CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
||||||
{
|
{
|
||||||
RelationRestrictionContext *newContext = (RelationRestrictionContext *)
|
RelationRestrictionContext *newContext =
|
||||||
palloc(sizeof(RelationRestrictionContext));
|
(RelationRestrictionContext *) palloc(sizeof(RelationRestrictionContext));
|
||||||
ListCell *relationRestrictionCell = NULL;
|
ListCell *relationRestrictionCell = NULL;
|
||||||
|
|
||||||
newContext->hasDistributedRelation = oldContext->hasDistributedRelation;
|
newContext->hasDistributedRelation = oldContext->hasDistributedRelation;
|
||||||
|
|
|
@ -3169,20 +3169,43 @@ get_update_query_def(Query *query, deparse_context *context)
|
||||||
* Start the query with UPDATE relname SET
|
* Start the query with UPDATE relname SET
|
||||||
*/
|
*/
|
||||||
rte = rt_fetch(query->resultRelation, query->rtable);
|
rte = rt_fetch(query->resultRelation, query->rtable);
|
||||||
Assert(rte->rtekind == RTE_RELATION);
|
|
||||||
if (PRETTY_INDENT(context))
|
if (PRETTY_INDENT(context))
|
||||||
{
|
{
|
||||||
appendStringInfoChar(buf, ' ');
|
appendStringInfoChar(buf, ' ');
|
||||||
context->indentLevel += PRETTYINDENT_STD;
|
context->indentLevel += PRETTYINDENT_STD;
|
||||||
}
|
}
|
||||||
appendStringInfo(buf, "UPDATE %s%s",
|
|
||||||
only_marker(rte),
|
/* if it's a shard, do differently */
|
||||||
generate_relation_or_shard_name(rte->relid,
|
if (GetRangeTblKind(rte) == CITUS_RTE_SHARD)
|
||||||
context->distrelid,
|
{
|
||||||
context->shardid, NIL));
|
char *fragmentSchemaName = NULL;
|
||||||
if (rte->alias != NULL)
|
char *fragmentTableName = NULL;
|
||||||
appendStringInfo(buf, " %s",
|
|
||||||
quote_identifier(rte->alias->aliasname));
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
|
/* use schema and table name from the remote alias */
|
||||||
|
appendStringInfo(buf, "UPDATE %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_fragment_name(fragmentSchemaName, fragmentTableName));
|
||||||
|
|
||||||
|
if(rte->eref != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->eref->aliasname));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(buf, "UPDATE %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_relation_or_shard_name(rte->relid,
|
||||||
|
context->distrelid,
|
||||||
|
context->shardid, NIL));
|
||||||
|
|
||||||
|
if (rte->alias != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->alias->aliasname));
|
||||||
|
}
|
||||||
|
|
||||||
appendStringInfoString(buf, " SET ");
|
appendStringInfoString(buf, " SET ");
|
||||||
|
|
||||||
/* Deparse targetlist */
|
/* Deparse targetlist */
|
||||||
|
@ -3366,20 +3389,42 @@ get_delete_query_def(Query *query, deparse_context *context)
|
||||||
* Start the query with DELETE FROM relname
|
* Start the query with DELETE FROM relname
|
||||||
*/
|
*/
|
||||||
rte = rt_fetch(query->resultRelation, query->rtable);
|
rte = rt_fetch(query->resultRelation, query->rtable);
|
||||||
Assert(rte->rtekind == RTE_RELATION);
|
|
||||||
if (PRETTY_INDENT(context))
|
if (PRETTY_INDENT(context))
|
||||||
{
|
{
|
||||||
appendStringInfoChar(buf, ' ');
|
appendStringInfoChar(buf, ' ');
|
||||||
context->indentLevel += PRETTYINDENT_STD;
|
context->indentLevel += PRETTYINDENT_STD;
|
||||||
}
|
}
|
||||||
appendStringInfo(buf, "DELETE FROM %s%s",
|
|
||||||
only_marker(rte),
|
/* if it's a shard, do differently */
|
||||||
generate_relation_or_shard_name(rte->relid,
|
if (GetRangeTblKind(rte) == CITUS_RTE_SHARD)
|
||||||
context->distrelid,
|
{
|
||||||
context->shardid, NIL));
|
char *fragmentSchemaName = NULL;
|
||||||
if (rte->alias != NULL)
|
char *fragmentTableName = NULL;
|
||||||
appendStringInfo(buf, " %s",
|
|
||||||
quote_identifier(rte->alias->aliasname));
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
|
/* use schema and table name from the remote alias */
|
||||||
|
appendStringInfo(buf, "DELETE FROM %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_fragment_name(fragmentSchemaName, fragmentTableName));
|
||||||
|
|
||||||
|
if(rte->eref != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->eref->aliasname));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(buf, "DELETE FROM %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_relation_or_shard_name(rte->relid,
|
||||||
|
context->distrelid,
|
||||||
|
context->shardid, NIL));
|
||||||
|
|
||||||
|
if (rte->alias != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->alias->aliasname));
|
||||||
|
}
|
||||||
|
|
||||||
/* Add the USING clause if given */
|
/* Add the USING clause if given */
|
||||||
get_from_clause(query, " USING ", context);
|
get_from_clause(query, " USING ", context);
|
||||||
|
@ -6857,7 +6902,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
|
||||||
|
|
||||||
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
/* Use schema and table name from the remote alias */
|
/* use schema and table name from the remote alias */
|
||||||
appendStringInfoString(buf,
|
appendStringInfoString(buf,
|
||||||
generate_fragment_name(fragmentSchemaName,
|
generate_fragment_name(fragmentSchemaName,
|
||||||
fragmentTableName));
|
fragmentTableName));
|
||||||
|
|
|
@ -3152,20 +3152,43 @@ get_update_query_def(Query *query, deparse_context *context)
|
||||||
* Start the query with UPDATE relname SET
|
* Start the query with UPDATE relname SET
|
||||||
*/
|
*/
|
||||||
rte = rt_fetch(query->resultRelation, query->rtable);
|
rte = rt_fetch(query->resultRelation, query->rtable);
|
||||||
Assert(rte->rtekind == RTE_RELATION);
|
|
||||||
if (PRETTY_INDENT(context))
|
if (PRETTY_INDENT(context))
|
||||||
{
|
{
|
||||||
appendStringInfoChar(buf, ' ');
|
appendStringInfoChar(buf, ' ');
|
||||||
context->indentLevel += PRETTYINDENT_STD;
|
context->indentLevel += PRETTYINDENT_STD;
|
||||||
}
|
}
|
||||||
appendStringInfo(buf, "UPDATE %s%s",
|
|
||||||
only_marker(rte),
|
/* if it's a shard, do differently */
|
||||||
generate_relation_or_shard_name(rte->relid,
|
if (GetRangeTblKind(rte) == CITUS_RTE_SHARD)
|
||||||
context->distrelid,
|
{
|
||||||
context->shardid, NIL));
|
char *fragmentSchemaName = NULL;
|
||||||
if (rte->alias != NULL)
|
char *fragmentTableName = NULL;
|
||||||
appendStringInfo(buf, " %s",
|
|
||||||
quote_identifier(rte->alias->aliasname));
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
|
/* use schema and table name from the remote alias */
|
||||||
|
appendStringInfo(buf, "UPDATE %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_fragment_name(fragmentSchemaName, fragmentTableName));
|
||||||
|
|
||||||
|
if(rte->eref != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->eref->aliasname));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(buf, "UPDATE %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_relation_or_shard_name(rte->relid,
|
||||||
|
context->distrelid,
|
||||||
|
context->shardid, NIL));
|
||||||
|
|
||||||
|
if (rte->alias != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->alias->aliasname));
|
||||||
|
}
|
||||||
|
|
||||||
appendStringInfoString(buf, " SET ");
|
appendStringInfoString(buf, " SET ");
|
||||||
|
|
||||||
/* Deparse targetlist */
|
/* Deparse targetlist */
|
||||||
|
@ -3349,20 +3372,42 @@ get_delete_query_def(Query *query, deparse_context *context)
|
||||||
* Start the query with DELETE FROM relname
|
* Start the query with DELETE FROM relname
|
||||||
*/
|
*/
|
||||||
rte = rt_fetch(query->resultRelation, query->rtable);
|
rte = rt_fetch(query->resultRelation, query->rtable);
|
||||||
Assert(rte->rtekind == RTE_RELATION);
|
|
||||||
if (PRETTY_INDENT(context))
|
if (PRETTY_INDENT(context))
|
||||||
{
|
{
|
||||||
appendStringInfoChar(buf, ' ');
|
appendStringInfoChar(buf, ' ');
|
||||||
context->indentLevel += PRETTYINDENT_STD;
|
context->indentLevel += PRETTYINDENT_STD;
|
||||||
}
|
}
|
||||||
appendStringInfo(buf, "DELETE FROM %s%s",
|
|
||||||
only_marker(rte),
|
/* if it's a shard, do differently */
|
||||||
generate_relation_or_shard_name(rte->relid,
|
if (GetRangeTblKind(rte) == CITUS_RTE_SHARD)
|
||||||
context->distrelid,
|
{
|
||||||
context->shardid, NIL));
|
char *fragmentSchemaName = NULL;
|
||||||
if (rte->alias != NULL)
|
char *fragmentTableName = NULL;
|
||||||
appendStringInfo(buf, " %s",
|
|
||||||
quote_identifier(rte->alias->aliasname));
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
|
/* use schema and table name from the remote alias */
|
||||||
|
appendStringInfo(buf, "DELETE FROM %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_fragment_name(fragmentSchemaName, fragmentTableName));
|
||||||
|
|
||||||
|
if(rte->eref != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->eref->aliasname));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(buf, "DELETE FROM %s%s",
|
||||||
|
only_marker(rte),
|
||||||
|
generate_relation_or_shard_name(rte->relid,
|
||||||
|
context->distrelid,
|
||||||
|
context->shardid, NIL));
|
||||||
|
|
||||||
|
if (rte->alias != NULL)
|
||||||
|
appendStringInfo(buf, " %s",
|
||||||
|
quote_identifier(rte->alias->aliasname));
|
||||||
|
}
|
||||||
|
|
||||||
/* Add the USING clause if given */
|
/* Add the USING clause if given */
|
||||||
get_from_clause(query, " USING ", context);
|
get_from_clause(query, " USING ", context);
|
||||||
|
@ -6577,7 +6622,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
|
||||||
|
|
||||||
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
ExtractRangeTblExtraData(rte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);
|
||||||
|
|
||||||
/* Use schema and table name from the remote alias */
|
/* use schema and table name from the remote alias */
|
||||||
appendStringInfoString(buf,
|
appendStringInfoString(buf,
|
||||||
generate_fragment_name(fragmentSchemaName,
|
generate_fragment_name(fragmentSchemaName,
|
||||||
fragmentTableName));
|
fragmentTableName));
|
||||||
|
|
|
@ -31,12 +31,16 @@ extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query,
|
||||||
extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query,
|
extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
extern bool RouterSelectQuery(Query *originalQuery,
|
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *
|
||||||
List **placementList, uint64 *anchorShardId,
|
restrictionContext,
|
||||||
List **relationShardList, bool replacePrunedQueryWithDummy);
|
List **placementList, uint64 *anchorShardId,
|
||||||
|
List **relationShardList, bool
|
||||||
|
replacePrunedQueryWithDummy);
|
||||||
|
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
|
||||||
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
|
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
|
||||||
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree);
|
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree,
|
||||||
|
bool multiShardQuery);
|
||||||
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
|
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
|
||||||
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
||||||
RelationRestrictionContext *oldContext);
|
RelationRestrictionContext *oldContext);
|
||||||
|
@ -46,7 +50,6 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||||
ShardInterval *shardInterval);
|
ShardInterval *shardInterval);
|
||||||
extern List * RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError);
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||||
|
|
|
@ -317,8 +317,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57638 dbname=regression
|
Node: host=localhost port=57638 dbname=regression
|
||||||
-> Update on lineitem_290000
|
-> Update on lineitem_290000 lineitem
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test delete
|
-- Test delete
|
||||||
|
@ -330,8 +330,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57638 dbname=regression
|
Node: host=localhost port=57638 dbname=regression
|
||||||
-> Delete on lineitem_290000
|
-> Delete on lineitem_290000 lineitem
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test zero-shard update
|
-- Test zero-shard update
|
||||||
|
|
|
@ -317,8 +317,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57638 dbname=regression
|
Node: host=localhost port=57638 dbname=regression
|
||||||
-> Update on lineitem_290000
|
-> Update on lineitem_290000 lineitem
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test delete
|
-- Test delete
|
||||||
|
@ -330,8 +330,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57638 dbname=regression
|
Node: host=localhost port=57638 dbname=regression
|
||||||
-> Delete on lineitem_290000
|
-> Delete on lineitem_290000 lineitem
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test zero-shard update
|
-- Test zero-shard update
|
||||||
|
|
|
@ -657,3 +657,430 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE app_analytics_events;
|
DROP TABLE app_analytics_events;
|
||||||
|
-- test UPDATE with subqueries
|
||||||
|
CREATE TABLE raw_table (id bigint, value bigint);
|
||||||
|
CREATE TABLE summary_table (
|
||||||
|
id bigint,
|
||||||
|
min_value numeric,
|
||||||
|
average_value numeric,
|
||||||
|
count int,
|
||||||
|
uniques int);
|
||||||
|
SELECT create_distributed_table('raw_table', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('summary_table', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO raw_table VALUES (1, 100);
|
||||||
|
INSERT INTO raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO raw_table VALUES (1, 300);
|
||||||
|
INSERT INTO raw_table VALUES (2, 400);
|
||||||
|
INSERT INTO raw_table VALUES (2, 500);
|
||||||
|
INSERT INTO summary_table VALUES (1);
|
||||||
|
INSERT INTO summary_table VALUES (2);
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+---------------+-------+---------
|
||||||
|
1 | | | |
|
||||||
|
2 | | | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | | 200.0000000000000000 | |
|
||||||
|
2 | | | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- try different syntax
|
||||||
|
UPDATE summary_table SET (min_value, average_value) =
|
||||||
|
(SELECT min(value), avg(value) FROM raw_table WHERE id = 2)
|
||||||
|
WHERE id = 2;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE summary_table SET min_value = 100
|
||||||
|
WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value > 100) AND id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- indeed, we don't need filter on UPDATE explicitly if SELECT already prunes to one shard
|
||||||
|
UPDATE summary_table SET uniques = 2
|
||||||
|
WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value IN (100, 200));
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- use inner results for non-partition column
|
||||||
|
UPDATE summary_table SET uniques = NULL
|
||||||
|
WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- these should not update anything
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1 AND id = 4;
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1 AND id = 4;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update with NULL value
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- unsupported multi-shard updates
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table) average_query;
|
||||||
|
ERROR: cannot run UPDATE command which targets multiple shards
|
||||||
|
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
||||||
|
UPDATE summary_table SET average_value = average_value + 1 WHERE id =
|
||||||
|
(SELECT id FROM raw_table WHERE value > 100);
|
||||||
|
ERROR: cannot run UPDATE command which targets multiple shards
|
||||||
|
HINT: Consider using an equality filter on partition column "id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
||||||
|
-- test complex queries
|
||||||
|
UPDATE summary_table
|
||||||
|
SET
|
||||||
|
uniques = metrics.expensive_uniques,
|
||||||
|
count = metrics.total_count
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
id,
|
||||||
|
count(DISTINCT (CASE WHEN value > 100 then value end)) AS expensive_uniques,
|
||||||
|
count(value) AS total_count
|
||||||
|
FROM raw_table
|
||||||
|
WHERE id = 1
|
||||||
|
GROUP BY id) metrics
|
||||||
|
WHERE
|
||||||
|
summary_table.id = metrics.id AND
|
||||||
|
summary_table.id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | | 4 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- test joins
|
||||||
|
UPDATE summary_table SET count = count + 1 FROM raw_table
|
||||||
|
WHERE raw_table.id = summary_table.id AND summary_table.id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | | 5 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- test with prepared statements
|
||||||
|
PREPARE prepared_update_with_subquery(int, int) AS
|
||||||
|
UPDATE summary_table SET count = count + $1 FROM raw_table
|
||||||
|
WHERE raw_table.id = summary_table.id AND summary_table.id = $2;
|
||||||
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | | 65 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- test with reference tables
|
||||||
|
CREATE TABLE reference_raw_table (id bigint, value bigint);
|
||||||
|
CREATE TABLE reference_summary_table (
|
||||||
|
id bigint,
|
||||||
|
min_value numeric,
|
||||||
|
average_value numeric,
|
||||||
|
count int,
|
||||||
|
uniques int);
|
||||||
|
SELECT create_reference_table('reference_raw_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_reference_table('reference_summary_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 100);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 300);
|
||||||
|
INSERT INTO reference_raw_table VALUES (2, 400);
|
||||||
|
INSERT INTO reference_raw_table VALUES (2, 500);
|
||||||
|
INSERT INTO reference_summary_table VALUES (1);
|
||||||
|
INSERT INTO reference_summary_table VALUES (2);
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+---------------+-------+---------
|
||||||
|
1 | | | |
|
||||||
|
2 | | | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
UPDATE reference_summary_table SET (min_value, average_value) =
|
||||||
|
(SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2)
|
||||||
|
WHERE id = 2;
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- no need partition colum equalities on reference tables
|
||||||
|
UPDATE reference_summary_table SET (count) =
|
||||||
|
(SELECT id AS inner_id FROM reference_raw_table WHERE value = 500)
|
||||||
|
WHERE min_value = 400;
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | | 200.0000000000000000 | |
|
||||||
|
2 | 400 | 450.0000000000000000 | 2 |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- can read from a reference table and update a distributed table
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
-- cannot read from a distributed table and update a reference table
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
ERROR: cannot perform select on a distributed table and modify a reference table
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 2
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
ERROR: cannot perform select on a distributed table and modify a reference table
|
||||||
|
-- test master_modify_multiple_shards() with subqueries and expect to fail
|
||||||
|
SELECT master_modify_multiple_shards('
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1');
|
||||||
|
ERROR: cannot perform distributed planning for the given modifications
|
||||||
|
DETAIL: Subqueries are not supported in distributed modifications.
|
||||||
|
-- test connection API via using COPY
|
||||||
|
-- COPY on SELECT part
|
||||||
|
BEGIN;
|
||||||
|
\COPY raw_table FROM STDIN WITH CSV
|
||||||
|
INSERT INTO summary_table VALUES (3);
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 3
|
||||||
|
) average_query
|
||||||
|
WHERE id = 3;
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | 65 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- COPY on UPDATE part
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO raw_table VALUES (4, 100);
|
||||||
|
INSERT INTO raw_table VALUES (4, 200);
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 4;
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | 65 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- COPY on both part
|
||||||
|
BEGIN;
|
||||||
|
\COPY raw_table FROM STDIN WITH CSV
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 5
|
||||||
|
) average_query
|
||||||
|
WHERE id = 5;
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | 65 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- COPY on reference tables
|
||||||
|
BEGIN;
|
||||||
|
\COPY reference_raw_table FROM STDIN WITH CSV
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 6
|
||||||
|
) average_query
|
||||||
|
WHERE id = 6;
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
1 | 100 | 200.0000000000000000 | 65 | 2
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
6 | | 150.0000000000000000 | |
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
-- test DELETE queries
|
||||||
|
SELECT * FROM raw_table ORDER BY id, value;
|
||||||
|
id | value
|
||||||
|
----+-------
|
||||||
|
1 | 100
|
||||||
|
1 | 200
|
||||||
|
1 | 200
|
||||||
|
1 | 300
|
||||||
|
2 | 400
|
||||||
|
2 | 500
|
||||||
|
3 | 100
|
||||||
|
3 | 200
|
||||||
|
4 | 100
|
||||||
|
4 | 200
|
||||||
|
5 | 100
|
||||||
|
5 | 200
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
DELETE FROM summary_table
|
||||||
|
WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
2 | 400 | 450.0000000000000000 | |
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
6 | | 150.0000000000000000 | |
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- test with different syntax
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = 2;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
6 | | 150.0000000000000000 | |
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- cannot read from a distributed table and delete from a reference table
|
||||||
|
DELETE FROM reference_summary_table USING raw_table
|
||||||
|
WHERE reference_summary_table.id = raw_table.id AND raw_table.id = 3;
|
||||||
|
ERROR: cannot perform select on a distributed table and modify a reference table
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
6 | | 150.0000000000000000 | |
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- test connection API via using COPY with DELETEs
|
||||||
|
BEGIN;
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = 1;
|
||||||
|
DELETE FROM summary_table USING reference_raw_table
|
||||||
|
WHERE summary_table.id = reference_raw_table.id AND reference_raw_table.id = 2;
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+----------------------+-------+---------
|
||||||
|
3 | | 150.0000000000000000 | |
|
||||||
|
4 | | 150.0000000000000000 | |
|
||||||
|
5 | | 150.0000000000000000 | |
|
||||||
|
6 | | 150.0000000000000000 | |
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- test DELETEs with prepared statements
|
||||||
|
PREPARE prepared_delete_with_join(int) AS
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = $1;
|
||||||
|
INSERT INTO raw_table VALUES (6, 100);
|
||||||
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
EXECUTE prepared_delete_with_join(1);
|
||||||
|
EXECUTE prepared_delete_with_join(2);
|
||||||
|
EXECUTE prepared_delete_with_join(3);
|
||||||
|
EXECUTE prepared_delete_with_join(4);
|
||||||
|
EXECUTE prepared_delete_with_join(5);
|
||||||
|
EXECUTE prepared_delete_with_join(6);
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
id | min_value | average_value | count | uniques
|
||||||
|
----+-----------+---------------+-------+---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE raw_table;
|
||||||
|
DROP TABLE summary_table;
|
||||||
|
DROP TABLE reference_raw_table;
|
||||||
|
DROP TABLE reference_summary_table;
|
||||||
|
|
|
@ -335,8 +335,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57637 dbname=regression
|
Node: host=localhost port=57637 dbname=regression
|
||||||
-> Update on lineitem_mx_1220052
|
-> Update on lineitem_mx_1220052 lineitem_mx
|
||||||
-> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052
|
-> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test delete
|
-- Test delete
|
||||||
|
@ -348,8 +348,8 @@ Custom Scan (Citus Router)
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
-> Task
|
-> Task
|
||||||
Node: host=localhost port=57637 dbname=regression
|
Node: host=localhost port=57637 dbname=regression
|
||||||
-> Delete on lineitem_mx_1220052
|
-> Delete on lineitem_mx_1220052 lineitem_mx
|
||||||
-> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052
|
-> Index Scan using lineitem_mx_pkey_1220052 on lineitem_mx_1220052 lineitem_mx
|
||||||
Index Cond: (l_orderkey = 1)
|
Index Cond: (l_orderkey = 1)
|
||||||
Filter: (l_partkey = 0)
|
Filter: (l_partkey = 0)
|
||||||
-- Test single-shard SELECT
|
-- Test single-shard SELECT
|
||||||
|
|
|
@ -236,7 +236,8 @@ INSERT INTO dropcol_distributed AS dropcol (key, keep1) VALUES (1, '5') ON CONFL
|
||||||
-- subquery in the SET clause
|
-- subquery in the SET clause
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
||||||
UPDATE SET other_col = (SELECT count(*) from upsert_test);
|
UPDATE SET other_col = (SELECT count(*) from upsert_test);
|
||||||
ERROR: subqueries are not supported in distributed modifications
|
ERROR: cannot perform distributed planning for the given modifications
|
||||||
|
DETAIL: Subqueries are not supported in distributed modifications.
|
||||||
-- non mutable function call in the SET
|
-- non mutable function call in the SET
|
||||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
|
||||||
UPDATE SET other_col = random()::int;
|
UPDATE SET other_col = random()::int;
|
||||||
|
|
|
@ -30,7 +30,7 @@ DETAIL: Where clause includes a column other than partition column
|
||||||
-- Check that free-form deletes are not supported.
|
-- Check that free-form deletes are not supported.
|
||||||
DELETE FROM customer_delete_protocol WHERE c_custkey > 100;
|
DELETE FROM customer_delete_protocol WHERE c_custkey > 100;
|
||||||
ERROR: cannot run DELETE command which targets multiple shards
|
ERROR: cannot run DELETE command which targets multiple shards
|
||||||
HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). You can also use master_apply_delete_command() to drop all shards satisfying delete criteria.
|
HINT: Consider using an equality filter on partition column "c_custkey" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
||||||
-- Check that we delete a shard if and only if all rows in the shard satisfy the condition.
|
-- Check that we delete a shard if and only if all rows in the shard satisfy the condition.
|
||||||
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
|
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
|
||||||
WHERE c_custkey > 6500');
|
WHERE c_custkey > 6500');
|
||||||
|
|
|
@ -429,3 +429,335 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING i
|
||||||
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||||
|
|
||||||
DROP TABLE app_analytics_events;
|
DROP TABLE app_analytics_events;
|
||||||
|
|
||||||
|
-- test UPDATE with subqueries
|
||||||
|
CREATE TABLE raw_table (id bigint, value bigint);
|
||||||
|
CREATE TABLE summary_table (
|
||||||
|
id bigint,
|
||||||
|
min_value numeric,
|
||||||
|
average_value numeric,
|
||||||
|
count int,
|
||||||
|
uniques int);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('raw_table', 'id');
|
||||||
|
SELECT create_distributed_table('summary_table', 'id');
|
||||||
|
|
||||||
|
INSERT INTO raw_table VALUES (1, 100);
|
||||||
|
INSERT INTO raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO raw_table VALUES (1, 300);
|
||||||
|
INSERT INTO raw_table VALUES (2, 400);
|
||||||
|
INSERT INTO raw_table VALUES (2, 500);
|
||||||
|
|
||||||
|
INSERT INTO summary_table VALUES (1);
|
||||||
|
INSERT INTO summary_table VALUES (2);
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- try different syntax
|
||||||
|
UPDATE summary_table SET (min_value, average_value) =
|
||||||
|
(SELECT min(value), avg(value) FROM raw_table WHERE id = 2)
|
||||||
|
WHERE id = 2;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
UPDATE summary_table SET min_value = 100
|
||||||
|
WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value > 100) AND id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- indeed, we don't need filter on UPDATE explicitly if SELECT already prunes to one shard
|
||||||
|
UPDATE summary_table SET uniques = 2
|
||||||
|
WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value IN (100, 200));
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- use inner results for non-partition column
|
||||||
|
UPDATE summary_table SET uniques = NULL
|
||||||
|
WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- these should not update anything
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1 AND id = 4;
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1 AND id = 4;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- update with NULL value
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- unsupported multi-shard updates
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table) average_query;
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_value + 1 WHERE id =
|
||||||
|
(SELECT id FROM raw_table WHERE value > 100);
|
||||||
|
|
||||||
|
-- test complex queries
|
||||||
|
UPDATE summary_table
|
||||||
|
SET
|
||||||
|
uniques = metrics.expensive_uniques,
|
||||||
|
count = metrics.total_count
|
||||||
|
FROM
|
||||||
|
(SELECT
|
||||||
|
id,
|
||||||
|
count(DISTINCT (CASE WHEN value > 100 then value end)) AS expensive_uniques,
|
||||||
|
count(value) AS total_count
|
||||||
|
FROM raw_table
|
||||||
|
WHERE id = 1
|
||||||
|
GROUP BY id) metrics
|
||||||
|
WHERE
|
||||||
|
summary_table.id = metrics.id AND
|
||||||
|
summary_table.id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test joins
|
||||||
|
UPDATE summary_table SET count = count + 1 FROM raw_table
|
||||||
|
WHERE raw_table.id = summary_table.id AND summary_table.id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test with prepared statements
|
||||||
|
PREPARE prepared_update_with_subquery(int, int) AS
|
||||||
|
UPDATE summary_table SET count = count + $1 FROM raw_table
|
||||||
|
WHERE raw_table.id = summary_table.id AND summary_table.id = $2;
|
||||||
|
|
||||||
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
EXECUTE prepared_update_with_subquery(10, 1);
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test with reference tables
|
||||||
|
|
||||||
|
CREATE TABLE reference_raw_table (id bigint, value bigint);
|
||||||
|
CREATE TABLE reference_summary_table (
|
||||||
|
id bigint,
|
||||||
|
min_value numeric,
|
||||||
|
average_value numeric,
|
||||||
|
count int,
|
||||||
|
uniques int);
|
||||||
|
|
||||||
|
SELECT create_reference_table('reference_raw_table');
|
||||||
|
SELECT create_reference_table('reference_summary_table');
|
||||||
|
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 100);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 200);
|
||||||
|
INSERT INTO reference_raw_table VALUES (1, 300);
|
||||||
|
INSERT INTO reference_raw_table VALUES (2, 400);
|
||||||
|
INSERT INTO reference_raw_table VALUES (2, 500);
|
||||||
|
|
||||||
|
INSERT INTO reference_summary_table VALUES (1);
|
||||||
|
INSERT INTO reference_summary_table VALUES (2);
|
||||||
|
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
UPDATE reference_summary_table SET (min_value, average_value) =
|
||||||
|
(SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2)
|
||||||
|
WHERE id = 2;
|
||||||
|
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- no need partition colum equalities on reference tables
|
||||||
|
UPDATE reference_summary_table SET (count) =
|
||||||
|
(SELECT id AS inner_id FROM reference_raw_table WHERE value = 500)
|
||||||
|
WHERE min_value = 400;
|
||||||
|
|
||||||
|
SELECT * FROM reference_summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- can read from a reference table and update a distributed table
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
-- cannot read from a distributed table and update a reference table
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
UPDATE reference_summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 2
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1;
|
||||||
|
|
||||||
|
-- test master_modify_multiple_shards() with subqueries and expect to fail
|
||||||
|
SELECT master_modify_multiple_shards('
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 1
|
||||||
|
) average_query
|
||||||
|
WHERE id = 1');
|
||||||
|
|
||||||
|
-- test connection API via using COPY
|
||||||
|
|
||||||
|
-- COPY on SELECT part
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
\COPY raw_table FROM STDIN WITH CSV
|
||||||
|
3, 100
|
||||||
|
3, 200
|
||||||
|
\.
|
||||||
|
|
||||||
|
INSERT INTO summary_table VALUES (3);
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 3
|
||||||
|
) average_query
|
||||||
|
WHERE id = 3;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- COPY on UPDATE part
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
INSERT INTO raw_table VALUES (4, 100);
|
||||||
|
INSERT INTO raw_table VALUES (4, 200);
|
||||||
|
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
4,,,,
|
||||||
|
\.
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 4
|
||||||
|
) average_query
|
||||||
|
WHERE id = 4;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- COPY on both part
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
\COPY raw_table FROM STDIN WITH CSV
|
||||||
|
5, 100
|
||||||
|
5, 200
|
||||||
|
\.
|
||||||
|
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
5,,,,
|
||||||
|
\.
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM raw_table WHERE id = 5
|
||||||
|
) average_query
|
||||||
|
WHERE id = 5;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- COPY on reference tables
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
\COPY reference_raw_table FROM STDIN WITH CSV
|
||||||
|
6, 100
|
||||||
|
6, 200
|
||||||
|
\.
|
||||||
|
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
6,,,,
|
||||||
|
\.
|
||||||
|
|
||||||
|
UPDATE summary_table SET average_value = average_query.average FROM (
|
||||||
|
SELECT avg(value) AS average FROM reference_raw_table WHERE id = 6
|
||||||
|
) average_query
|
||||||
|
WHERE id = 6;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test DELETE queries
|
||||||
|
SELECT * FROM raw_table ORDER BY id, value;
|
||||||
|
|
||||||
|
DELETE FROM summary_table
|
||||||
|
WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test with different syntax
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = 2;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- cannot read from a distributed table and delete from a reference table
|
||||||
|
DELETE FROM reference_summary_table USING raw_table
|
||||||
|
WHERE reference_summary_table.id = raw_table.id AND raw_table.id = 3;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test connection API via using COPY with DELETEs
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
\COPY summary_table FROM STDIN WITH CSV
|
||||||
|
1,,,,
|
||||||
|
2,,,,
|
||||||
|
\.
|
||||||
|
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = 1;
|
||||||
|
|
||||||
|
DELETE FROM summary_table USING reference_raw_table
|
||||||
|
WHERE summary_table.id = reference_raw_table.id AND reference_raw_table.id = 2;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
-- test DELETEs with prepared statements
|
||||||
|
PREPARE prepared_delete_with_join(int) AS
|
||||||
|
DELETE FROM summary_table USING raw_table
|
||||||
|
WHERE summary_table.id = raw_table.id AND raw_table.id = $1;
|
||||||
|
|
||||||
|
INSERT INTO raw_table VALUES (6, 100);
|
||||||
|
|
||||||
|
-- execute 6 times to trigger prepared statement usage
|
||||||
|
EXECUTE prepared_delete_with_join(1);
|
||||||
|
EXECUTE prepared_delete_with_join(2);
|
||||||
|
EXECUTE prepared_delete_with_join(3);
|
||||||
|
EXECUTE prepared_delete_with_join(4);
|
||||||
|
EXECUTE prepared_delete_with_join(5);
|
||||||
|
EXECUTE prepared_delete_with_join(6);
|
||||||
|
|
||||||
|
SELECT * FROM summary_table ORDER BY id;
|
||||||
|
|
||||||
|
DROP TABLE raw_table;
|
||||||
|
DROP TABLE summary_table;
|
||||||
|
DROP TABLE reference_raw_table;
|
||||||
|
DROP TABLE reference_summary_table;
|
||||||
|
|
Loading…
Reference in New Issue