Merge pull request #2124 from citusdata/ot_msud_subquery

Support UPDATE/DELETE with joins and subqueries
pull/2075/head
Burak Velioglu 2018-05-02 17:25:07 +03:00 committed by GitHub
commit 7850f93127
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1940 additions and 270 deletions

View File

@ -310,8 +310,7 @@ AcquireExecutorMultiShardLocks(List *taskList)
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT..SELECT
* commands from having different effects on different placements.
* results on different replicas.
*/
if (RequiresConsistentSnapshot(task))
@ -330,18 +329,17 @@ AcquireExecutorMultiShardLocks(List *taskList)
/*
* RequiresConsistentSnapshot returns true if the given task need to take
* the necessary locks to ensure that a subquery in the INSERT ... SELECT
* query returns the same output for all task placements.
* the necessary locks to ensure that a subquery in the modify query
* returns the same output for all task placements.
*/
static bool
RequiresConsistentSnapshot(Task *task)
{
bool requiresIsolation = false;
if (!task->insertSelectQuery)
if (!task->modifyWithSubquery)
{
/*
* Only INSERT/SELECT commands currently require SELECT isolation.
* Other commands do not read from other shards.
*/

View File

@ -151,7 +151,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
{
bool multiShardQuery = true;
DeferredErrorMessage *error =
ModifyQuerySupported(modifyQuery, modifyQuery, multiShardQuery);
ModifyQuerySupported(modifyQuery, modifyQuery, multiShardQuery, NULL);
if (error)
{

View File

@ -58,7 +58,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
{
query = copyObject(originalQuery);
}
else if (task->insertSelectQuery)
else if (query->commandType == CMD_INSERT && task->modifyWithSubquery)
{
/* for INSERT..SELECT, adjust shard names in SELECT part */
RangeTblEntry *copiedInsertRte = NULL;

View File

@ -593,9 +593,6 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query,
ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
DistributedPlan *distributedPlan = NULL;
MultiTreeRoot *logicalPlan = NULL;
List *subPlanList = NIL;
@ -608,7 +605,7 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query,
*/
distributedPlan = CreateRouterPlan(originalQuery, query,
relationRestrictionContext);
plannerRestrictionContext);
if (distributedPlan != NULL)
{
if (distributedPlan->planningError == NULL)

View File

@ -44,8 +44,8 @@ static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
plannerRestrictionContext);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
ShardInterval *shardInterval,
RelationRestrictionContext *
restrictionContext,
PlannerRestrictionContext *
plannerRestrictionContext,
uint32 taskIdIndex,
bool allRelationsJoinedOnPartitionKey);
static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
@ -252,14 +252,14 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
Task *modifyTask = NULL;
modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval,
relationRestrictionContext,
plannerRestrictionContext,
taskIdIndex,
allDistributionKeysInQueryAreEqual);
/* add the task if it could be created */
if (modifyTask != NULL)
{
modifyTask->insertSelectQuery = true;
modifyTask->modifyWithSubquery = true;
sqlTaskList = lappend(sqlTaskList, modifyTask);
}
@ -404,7 +404,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
*/
static Task *
RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval,
RelationRestrictionContext *restrictionContext,
PlannerRestrictionContext *plannerRestrictionContext,
uint32 taskIdIndex,
bool safeToPushdownSubquery)
{
@ -417,8 +417,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
Oid distributedTableId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
RelationRestrictionContext *copiedRestrictionContext =
CopyRelationRestrictionContext(restrictionContext);
PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0(
sizeof(PlannerRestrictionContext));
StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL;
@ -431,11 +431,22 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
List *intersectedPlacementList = NULL;
bool upsertQuery = false;
bool replacePrunedQueryWithDummy = false;
bool allReferenceTables = restrictionContext->allReferenceTables;
bool allReferenceTables =
plannerRestrictionContext->relationRestrictionContext->allReferenceTables;
List *shardOpExpressions = NIL;
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
bool multiShardModifyQuery = false;
List *relationRestrictionList = NIL;
copyOfPlannerRestrictionContext->relationRestrictionContext =
CopyRelationRestrictionContext(
plannerRestrictionContext->relationRestrictionContext);
copyOfPlannerRestrictionContext->joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
relationRestrictionList =
copyOfPlannerRestrictionContext->relationRestrictionContext->
relationRestrictionList;
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -444,7 +455,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* Replace the partitioning qual parameter value in all baserestrictinfos.
* Note that this has to be done on a copy, as the walker modifies in place.
*/
foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList)
foreach(restrictionCell, relationRestrictionList)
{
RelationRestriction *restriction = lfirst(restrictionCell);
List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo;
@ -493,7 +504,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* If we can, we also rely on the side-effects that all RTEs have been updated
* to point to the relevant nodes and selectPlacementList is determined.
*/
planningError = PlanRouterQuery(copiedSubquery, copiedRestrictionContext,
planningError = PlanRouterQuery(copiedSubquery, copyOfPlannerRestrictionContext,
&selectPlacementList, &selectAnchorShardId,
&relationShardList, replacePrunedQueryWithDummy,
&multiShardModifyQuery);

View File

@ -127,15 +127,12 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan);
static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext);
static List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestrictionContext *
relationRestrictionContext,
List *prunedRelationShardList, TaskType taskType);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static Task * QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
uint32 taskId,
TaskType taskType);
TaskType taskType,
bool modifyRequiresMasterEvaluation);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
ShardInterval *secondInterval);
@ -202,6 +199,7 @@ static uint32 FinalTargetEntryCount(List *targetEntryList);
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
ShardInterval *secondInterval);
/*
* CreatePhysicalDistributedPlan is the entry point for physical plan generation. The
* function builds the physical plan; this plan includes the list of tasks to be
@ -2008,7 +2006,8 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
plannerRestrictionContext->
relationRestrictionContext,
prunedRelationShardList, SQL_TASK);
prunedRelationShardList, SQL_TASK,
false);
}
else
{
@ -2068,7 +2067,8 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
List *
QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestrictionContext *relationRestrictionContext,
List *prunedRelationShardList, TaskType taskType)
List *prunedRelationShardList, TaskType taskType, bool
modifyRequiresMasterEvaluation)
{
List *sqlTaskList = NIL;
ListCell *restrictionCell = NULL;
@ -2168,13 +2168,25 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
subqueryTask = QueryPushdownTaskCreate(query, shardOffset,
relationRestrictionContext, taskIdIndex,
taskType);
taskType, modifyRequiresMasterEvaluation);
subqueryTask->jobId = jobId;
sqlTaskList = lappend(sqlTaskList, subqueryTask);
++taskIdIndex;
}
/* If it is a modify task with multiple tables */
if (taskType == MODIFY_TASK && list_length(
relationRestrictionContext->relationRestrictionList) > 1)
{
ListCell *taskCell = NULL;
foreach(taskCell, sqlTaskList)
{
Task *task = (Task *) lfirst(taskCell);
task->modifyWithSubquery = true;
}
}
return sqlTaskList;
}
@ -2198,6 +2210,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
uint32 relationIndex = 0;
uint32 rangeDistributedRelationCount = 0;
uint32 hashDistributedRelationCount = 0;
uint32 appendDistributedRelationCount = 0;
foreach(relationIdCell, relationIdList)
{
@ -2222,10 +2235,17 @@ ErrorIfUnsupportedShardDistribution(Query *query)
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently append partitioned relations "
"are not supported")));
DistTableCacheEntry *distTableEntry = DistributedTableCacheEntry(relationId);
if (distTableEntry->hasOverlappingShardInterval)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently append partitioned relations "
"with overlapping shard intervals are "
"not supported")));
}
appendDistributedRelationCount++;
}
}
@ -2236,6 +2256,20 @@ ErrorIfUnsupportedShardDistribution(Query *query)
errdetail("A query including both range and hash "
"partitioned relations are unsupported")));
}
else if ((rangeDistributedRelationCount > 0) && (appendDistributedRelationCount > 0))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("A query including both range and append "
"partitioned relations are unsupported")));
}
else if ((appendDistributedRelationCount > 0) && (hashDistributedRelationCount > 0))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("A query including both append and hash "
"partitioned relations are unsupported")));
}
foreach(relationIdCell, nonReferenceRelations)
{
@ -2273,7 +2307,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
static Task *
QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId,
TaskType taskType)
TaskType taskType, bool modifyRequiresMasterEvaluation)
{
Query *taskQuery = copyObject(originalQuery);
@ -2285,6 +2319,20 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
List *selectPlacementList = NIL;
uint64 jobId = INVALID_JOB_ID;
uint64 anchorShardId = INVALID_SHARD_ID;
bool modifyWithSubselect = false;
RangeTblEntry *resultRangeTable = NULL;
Oid resultRelationOid = InvalidOid;
/*
* If it is a modify query with sub-select, we need to set result relation shard's id
* as anchor shard id.
*/
if (UpdateOrDeleteQuery(originalQuery))
{
resultRangeTable = rt_fetch(originalQuery->resultRelation, originalQuery->rtable);
resultRelationOid = resultRangeTable->relid;
modifyWithSubselect = true;
}
/*
* Find the relevant shard out of each relation for this task.
@ -2310,12 +2358,19 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
anchorShardId = shardInterval->shardId;
}
}
else if (UpdateOrDeleteQuery(originalQuery))
{
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
if (!modifyWithSubselect || relationId == resultRelationOid)
{
/* for UPDATE/DELETE the shard in the result relation becomes the anchor shard */
anchorShardId = shardInterval->shardId;
}
}
else
{
/* use the shard from a specific index */
/* for SELECT we pick an arbitrary shard as the anchor shard */
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
/* use a shard from a distributed table as the anchor shard */
anchorShardId = shardInterval->shardId;
}
@ -2328,6 +2383,8 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
relationShardList = lappend(relationShardList, relationShard);
}
Assert(anchorShardId != INVALID_SHARD_ID);
selectPlacementList = WorkersContainingAllShards(taskShardList);
if (list_length(selectPlacementList) == 0)
{
@ -2346,14 +2403,22 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
* that the query string is generated as (...) AND (...) as opposed to
* (...), (...).
*/
taskQuery->jointree->quals =
(Node *) make_ands_explicit((List *) taskQuery->jointree->quals);
if (taskQuery->jointree->quals != NULL && IsA(taskQuery->jointree->quals, List))
{
taskQuery->jointree->quals = (Node *) make_ands_explicit(
(List *) taskQuery->jointree->quals);
}
/* and generate the full query string */
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL);
if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) ||
taskType == SQL_TASK)
{
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask->queryString = queryString->data;
}
subqueryTask = CreateBasicTask(jobId, taskId, taskType, queryString->data);
subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList;

View File

@ -38,6 +38,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/listutils.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
@ -108,9 +109,13 @@ bool EnableRouterExecution = true;
/* planner functions forward declarations */
static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
Query *query,
RelationRestrictionContext *
restrictionContext);
PlannerRestrictionContext *
plannerRestrictionContext);
static bool IsTidColumn(Node *node);
static DeferredErrorMessage * MultiShardModifyQuerySupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
bool *badCoalesce);
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
@ -124,7 +129,7 @@ static bool CanShardPrune(Oid distributedTableId, Query *query);
static Job * CreateJob(Query *query);
static Task * CreateTask(TaskType taskType);
static Job * RouterJob(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError);
static bool RelationPrunesToMultipleShards(List *relationShardList);
static void NormalizeMultiRowInsertTargetList(Query *query);
@ -147,8 +152,7 @@ static List * SingleShardSelectTaskList(Query *query, List *relationShardList,
List *placementList, uint64 shardId);
static List * SingleShardModifyTaskList(Query *query, List *relationShardList,
List *placementList, uint64 shardId);
static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShardList,
bool requiresMasterEvaluation);
/*
* CreateRouterPlan attempts to create a router executor plan for the given
@ -157,12 +161,13 @@ static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShard
*/
DistributedPlan *
CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
PlannerRestrictionContext *plannerRestrictionContext)
{
if (MultiRouterPlannableQuery(query, restrictionContext))
if (MultiRouterPlannableQuery(query,
plannerRestrictionContext->relationRestrictionContext))
{
return CreateSingleTaskRouterPlan(originalQuery, query,
restrictionContext);
plannerRestrictionContext);
}
/*
@ -189,7 +194,8 @@ CreateModifyPlan(Query *originalQuery, Query *query,
distributedPlan->operation = query->commandType;
distributedPlan->planningError = ModifyQuerySupported(query, originalQuery,
multiShardQuery);
multiShardQuery,
plannerRestrictionContext);
if (distributedPlan->planningError != NULL)
{
return distributedPlan;
@ -197,10 +203,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
if (UpdateOrDeleteQuery(query))
{
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
job = RouterJob(originalQuery, restrictionContext,
job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
}
else
@ -238,7 +241,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
*/
static DistributedPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
PlannerRestrictionContext *plannerRestrictionContext)
{
Job *job = NULL;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
@ -253,7 +256,8 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
}
/* we cannot have multi shard update/delete query via this code path */
job = RouterJob(originalQuery, restrictionContext, &distributedPlan->planningError);
job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
if (distributedPlan->planningError)
{
@ -516,7 +520,8 @@ IsTidColumn(Node *node)
* on the rewritten query.
*/
DeferredErrorMessage *
ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery)
ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
uint32 rangeTableId = 1;
@ -687,23 +692,31 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
/*
* Reject queries which involve joins. Note that UPSERTs are exceptional for this case.
* Queries like "INSERT INTO table_name ON CONFLICT DO UPDATE (col) SET other_col = ''"
* contains two range table entries, and we have to allow them.
* We have to allow modify queries with two range table entries, if it is pushdownable.
*/
if (commandType != CMD_INSERT && queryTableCount != 1)
if (commandType != CMD_INSERT)
{
/*
* We support UPDATE and DELETE with joins unless they are multi shard
* queries.
*/
if (!UpdateOrDeleteQuery(queryTree) || multiShardQuery)
/* We can not get restriction context via master_modify_multiple_shards path */
if (plannerRestrictionContext == NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot perform distributed planning for the given "
"modification",
"Joins are not supported in distributed "
"modifications.", NULL);
if (queryTableCount != 1)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join",
"Execute the query without using master_modify_multiple_shards()",
NULL);
}
}
/* If it is a multi-shard modify query with multiple tables */
else if (multiShardQuery)
{
DeferredErrorMessage *errorMessage = MultiShardModifyQuerySupported(
originalQuery, plannerRestrictionContext);
if (errorMessage != NULL)
{
return errorMessage;
}
}
}
@ -898,6 +911,114 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
}
/*
* MultiShardModifyQuerySupported returns the error message if the modify query is
* not pushdownable, otherwise it returns NULL.
*/
static DeferredErrorMessage *
MultiShardModifyQuerySupported(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
DeferredErrorMessage *errorMessage = NULL;
RangeTblEntry *resultRangeTable = rt_fetch(originalQuery->resultRelation,
originalQuery->rtable);
Oid resultRelationOid = resultRangeTable->relid;
char resultPartitionMethod = PartitionMethod(resultRelationOid);
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
{
errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"a join with USING causes an internal naming conflict, use "
"ON instead",
NULL, NULL);
}
else if (resultPartitionMethod == DISTRIBUTE_BY_NONE)
{
errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"only reference tables may be queried when targeting "
"a reference table with multi shard UPDATE/DELETE queries "
"with multiple tables ",
NULL, NULL);
}
else
{
errorMessage = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext);
}
return errorMessage;
}
/*
* HasDangerousJoinUsing search jointree for unnamed JOIN USING. Check the
* implementation of has_dangerous_join_using in ruleutils.
*/
static bool
HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
{
if (IsA(joinTreeNode, RangeTblRef))
{
/* nothing to do here */
}
else if (IsA(joinTreeNode, FromExpr))
{
FromExpr *fromExpr = (FromExpr *) joinTreeNode;
ListCell *listCell;
foreach(listCell, fromExpr->fromlist)
{
if (HasDangerousJoinUsing(rtableList, (Node *) lfirst(listCell)))
{
return true;
}
}
}
else if (IsA(joinTreeNode, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) joinTreeNode;
/* Is it an unnamed JOIN with USING? */
if (joinExpr->alias == NULL && joinExpr->usingClause)
{
/*
* Yes, so check each join alias var to see if any of them are not
* simple references to underlying columns. If so, we have a
* dangerous situation and must pick unique aliases.
*/
RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList);
ListCell *listCell;
foreach(listCell, joinRTE->joinaliasvars)
{
Var *aliasVar = (Var *) lfirst(listCell);
if (aliasVar != NULL && !IsA(aliasVar, Var))
{
return true;
}
}
}
/* Nope, but inspect children */
if (HasDangerousJoinUsing(rtableList, joinExpr->larg))
{
return true;
}
if (HasDangerousJoinUsing(rtableList, joinExpr->rarg))
{
return true;
}
}
else
{
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(joinTreeNode));
}
return false;
}
/*
* UpdateOrDeleteQuery checks if the given query is an UPDATE or DELETE command.
* If it is, it returns true otherwise it returns false.
@ -1351,7 +1472,7 @@ CreateTask(TaskType taskType)
task->upsertQuery = false;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->insertSelectQuery = false;
task->modifyWithSubquery = false;
task->relationShardList = NIL;
return task;
@ -1393,7 +1514,7 @@ ExtractFirstDistributedTableId(Query *query)
* multiple shard update/delete queries.
*/
static Job *
RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError)
{
Job *job = NULL;
@ -1412,7 +1533,7 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
/* check if this query requires master evaluation */
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
(*planningError) = PlanRouterQuery(originalQuery, restrictionContext,
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
&placementList, &shardId, &relationShardList,
replacePrunedQueryWithDummy,
&isMultiShardModifyQuery);
@ -1446,7 +1567,10 @@ RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
}
else if (isMultiShardModifyQuery)
{
job->taskList = MultiShardModifyTaskList(originalQuery, relationShardList,
job->taskList = QueryPushdownSqlTaskList(originalQuery, 0,
plannerRestrictionContext->
relationRestrictionContext,
relationShardList, MODIFY_TASK,
requiresMasterEvaluation);
}
else
@ -1482,46 +1606,6 @@ SingleShardSelectTaskList(Query *query, List *relationShardList, List *placement
}
/*
* MultiShardModifyTaskList generates task list for multi shard update/delete
* queries.
*/
static List *
MultiShardModifyTaskList(Query *originalQuery, List *relationShardList,
bool requiresMasterEvaluation)
{
List *taskList = NIL;
ListCell *relationShardCell = NULL;
int taskId = 1;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
List *relationShardList = list_make1(relationShard);
Task *task = CreateTask(MODIFY_TASK);
if (!requiresMasterEvaluation)
{
Query *copiedQuery = copyObject(originalQuery);
StringInfo shardQueryString = makeStringInfo();
UpdateRelationToShardNames((Node *) copiedQuery, relationShardList);
pg_get_query_def(copiedQuery, shardQueryString);
task->queryString = shardQueryString->data;
}
task->taskId = taskId++;
task->anchorShardId = relationShard->shardId;
task->relationShardList = relationShardList;
taskList = lappend(taskList, task);
}
return taskList;
}
/*
* SingleShardModifyTaskList generates a task for single shard update/delete query
* and returns it as a list.
@ -1657,7 +1741,8 @@ SelectsFromDistributedTable(List *rangeTableList)
* 0 values in UpdateRelationToShardNames.
*/
DeferredErrorMessage *
PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
PlanRouterQuery(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext,
List **placementList, uint64 *anchorShardId, List **relationShardList,
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery)
{
@ -1671,11 +1756,12 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
bool shardsPresent = false;
uint64 shardId = INVALID_SHARD_ID;
CmdType commandType = originalQuery->commandType;
bool isMultiShardModifyQuery = false;
*placementList = NIL;
prunedRelationShardList = TargetShardIntervalsForQuery(originalQuery,
restrictionContext,
plannerRestrictionContext->
relationRestrictionContext,
&isMultiShardQuery);
if (isMultiShardQuery)
@ -1695,13 +1781,23 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
Assert(UpdateOrDeleteQuery(originalQuery));
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery);
isMultiShardQuery,
plannerRestrictionContext);
if (planningError != NULL)
{
return planningError;
}
isMultiShardModifyQuery = true;
else
{
/*
* If the modify query uses multiple shards and update/delete query, relation
* shard list should be returned as list of shard list for each table. Check
* the implementation of QueryPushdownSqlTaskList.
*/
*relationShardList = prunedRelationShardList;
*multiShardModifyQuery = true;
return planningError;
}
}
foreach(prunedRelationShardListCell, prunedRelationShardList)
@ -1729,12 +1825,6 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
}
}
if (isMultiShardModifyQuery)
{
*multiShardModifyQuery = true;
return planningError;
}
/*
* We bail out if there are RTEs that prune multiple shards above, but
* there can also be multiple RTEs that reference the same relation.

View File

@ -233,6 +233,16 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *
context);
}
/* ExecInitExpr cannot handle PARAM_SUBLINK */
if (IsA(expression, Param))
{
Param *param = (Param *) expression;
if (param->paramkind == PARAM_SUBLINK)
{
return expression;
}
}
if (IsA(expression, Var))
{
context->containsVar = true;
@ -243,6 +253,15 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *
context);
}
/* expression_tree_mutator does not descend into Query trees */
if (IsA(expression, Query))
{
Query *query = (Query *) expression;
return (Node *) query_tree_mutator(query, PartiallyEvaluateExpressionMutator,
context, 0);
}
copy = expression_tree_mutator(expression,
PartiallyEvaluateExpressionMutator,
&localContext);

View File

@ -245,7 +245,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(taskExecution);
COPY_SCALAR_FIELD(upsertQuery);
COPY_SCALAR_FIELD(replicationModel);
COPY_SCALAR_FIELD(insertSelectQuery);
COPY_SCALAR_FIELD(modifyWithSubquery);
COPY_NODE_FIELD(relationShardList);
COPY_NODE_FIELD(rowValuesLists);
}

View File

@ -454,7 +454,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_BOOL_FIELD(modifyWithSubquery);
WRITE_NODE_FIELD(relationShardList);
WRITE_NODE_FIELD(rowValuesLists);
}

View File

@ -367,7 +367,7 @@ ReadTask(READFUNC_ARGS)
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(insertSelectQuery);
READ_BOOL_FIELD(modifyWithSubquery);
READ_NODE_FIELD(relationShardList);
READ_NODE_FIELD(rowValuesLists);

View File

@ -158,6 +158,10 @@ typedef struct MapMergeJob
*
* NB: Changing this requires also changing _outTask in citus_outfuncs and _readTask
* in citus_readfuncs to correctly (de)serialize this struct.
*
* INSERT ... SELECT queries and modify queries with subqueries or multiple tables
* set modifyWithSubquery to true. We need to use it to take the necessary locks
* to get consistent results for subqueries.
*/
typedef struct TaskExecution TaskExecution;
@ -180,7 +184,7 @@ typedef struct Task
bool upsertQuery; /* only applies to modify tasks */
char replicationModel; /* only applies to modify tasks */
bool insertSelectQuery;
bool modifyWithSubquery;
List *relationShardList;
List *rowValuesLists; /* rows to use when building multi-row INSERT */
@ -329,5 +333,12 @@ extern List * TaskListDifference(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList);
/* function declaration for creating Task */
extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
RelationRestrictionContext *
relationRestrictionContext,
List *prunedRelationShardList, TaskType taskType,
bool modifyRequiresMasterEvaluation);
#endif /* MULTI_PHYSICAL_PLANNER_H */

View File

@ -27,13 +27,14 @@
extern bool EnableRouterExecution;
extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext);
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
RelationRestrictionContext *
restrictionContext,
PlannerRestrictionContext *
plannerRestrictionContext,
List **placementList, uint64 *anchorShardId,
List **relationShardList, bool
replacePrunedQueryWithDummy,
@ -45,7 +46,9 @@ extern List * TargetShardIntervalsForQuery(Query *query,
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery,
bool multiShardQuery);
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
extern RelationRestrictionContext * CopyRelationRestrictionContext(
RelationRestrictionContext *oldContext);
@ -58,6 +61,7 @@ extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval);
extern bool UpdateOrDeleteQuery(Query *query);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -0,0 +1,127 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-insert_to_events_test_table s2-commit s1-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
step s1-insert_to_events_test_table:
INSERT INTO events_test_table VALUES(4,6,8,10);
<waiting ...>
step s2-commit:
COMMIT;
step s1-insert_to_events_test_table: <... completed>
step s1-commit:
COMMIT;
starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-update_events_test_table s2-commit s1-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
step s1-update_events_test_table:
UPDATE users_test_table SET value_1 = 3;
<waiting ...>
step s2-commit:
COMMIT;
step s1-update_events_test_table: <... completed>
step s1-commit:
COMMIT;
starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-delete_events_test_table s2-commit s1-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
step s1-delete_events_test_table:
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
<waiting ...>
step s2-commit:
COMMIT;
step s1-delete_events_test_table: <... completed>
step s1-commit:
COMMIT;
starting permutation: s1-begin s2-begin s1-insert_to_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-insert_to_events_test_table:
INSERT INTO events_test_table VALUES(4,6,8,10);
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
<waiting ...>
step s1-commit:
COMMIT;
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin s2-begin s1-update_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update_events_test_table:
UPDATE users_test_table SET value_1 = 3;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
<waiting ...>
step s1-commit:
COMMIT;
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;
starting permutation: s1-begin s2-begin s1-delete_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-delete_events_test_table:
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
<waiting ...>
step s1-commit:
COMMIT;
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;

View File

@ -816,9 +816,42 @@ Custom Scan (Citus Router)
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360044 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360044 lineitem_hash_part
SET citus.explain_all_tasks TO off;
-- Test update with subquery
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
FROM orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360041 lineitem_hash_part
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360045 orders_hash_part
-- Test delete with subquery
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part
USING orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360045 orders_hash_part
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
@ -1128,7 +1161,7 @@ SELECT l_orderkey FROM series JOIN keys ON (s = l_orderkey)
ORDER BY s;
Custom Scan (Citus Router)
Output: remote_scan.l_orderkey
-> Distributed Subplan 55_1
-> Distributed Subplan 57_1
-> HashAggregate
Output: remote_scan.l_orderkey
Group Key: remote_scan.l_orderkey
@ -1143,7 +1176,7 @@ Custom Scan (Citus Router)
Group Key: lineitem_hash_part.l_orderkey
-> Seq Scan on public.lineitem_hash_part_360041 lineitem_hash_part
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-> Distributed Subplan 55_2
-> Distributed Subplan 57_2
-> Function Scan on pg_catalog.generate_series s
Output: s
Function Call: generate_series(1, 10)
@ -1159,13 +1192,13 @@ Custom Scan (Citus Router)
Sort Key: intermediate_result.s
-> Function Scan on pg_catalog.read_intermediate_result intermediate_result
Output: intermediate_result.s
Function Call: read_intermediate_result('55_2'::text, 'binary'::citus_copy_format)
Function Call: read_intermediate_result('57_2'::text, 'binary'::citus_copy_format)
-> Sort
Output: intermediate_result_1.l_orderkey
Sort Key: intermediate_result_1.l_orderkey
-> Function Scan on pg_catalog.read_intermediate_result intermediate_result_1
Output: intermediate_result_1.l_orderkey
Function Call: read_intermediate_result('55_1'::text, 'binary'::citus_copy_format)
Function Call: read_intermediate_result('57_1'::text, 'binary'::citus_copy_format)
SELECT true AS valid FROM explain_json($$
WITH result AS (
SELECT l_quantity, count(*) count_quantity FROM lineitem

View File

@ -816,9 +816,42 @@ Custom Scan (Citus Router)
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_hash_part_360044 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360044 lineitem_hash_part
SET citus.explain_all_tasks TO off;
-- Test update with subquery
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
FROM orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Update on lineitem_hash_part_360041 lineitem_hash_part
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360045 orders_hash_part
-- Test delete with subquery
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part
USING orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360045 orders_hash_part
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
@ -1128,7 +1161,7 @@ SELECT l_orderkey FROM series JOIN keys ON (s = l_orderkey)
ORDER BY s;
Custom Scan (Citus Router)
Output: remote_scan.l_orderkey
-> Distributed Subplan 55_1
-> Distributed Subplan 57_1
-> HashAggregate
Output: remote_scan.l_orderkey
Group Key: remote_scan.l_orderkey
@ -1143,7 +1176,7 @@ Custom Scan (Citus Router)
Group Key: lineitem_hash_part.l_orderkey
-> Seq Scan on public.lineitem_hash_part_360041 lineitem_hash_part
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-> Distributed Subplan 55_2
-> Distributed Subplan 57_2
-> Function Scan on pg_catalog.generate_series s
Output: s
Function Call: generate_series(1, 10)
@ -1159,13 +1192,13 @@ Custom Scan (Citus Router)
Sort Key: intermediate_result.s
-> Function Scan on pg_catalog.read_intermediate_result intermediate_result
Output: intermediate_result.s
Function Call: read_intermediate_result('55_2'::text, 'binary'::citus_copy_format)
Function Call: read_intermediate_result('57_2'::text, 'binary'::citus_copy_format)
-> Sort
Output: intermediate_result_1.l_orderkey
Sort Key: intermediate_result_1.l_orderkey
-> Function Scan on pg_catalog.read_intermediate_result intermediate_result_1
Output: intermediate_result_1.l_orderkey
Function Call: read_intermediate_result('55_1'::text, 'binary'::citus_copy_format)
Function Call: read_intermediate_result('57_1'::text, 'binary'::citus_copy_format)
SELECT true AS valid FROM explain_json($$
WITH result AS (
SELECT l_quantity, count(*) count_quantity FROM lineitem

View File

@ -964,12 +964,10 @@ SELECT * FROM summary_table ORDER BY id;
-- unsupported multi-shard updates
UPDATE summary_table SET average_value = average_query.average FROM (
SELECT avg(value) AS average FROM raw_table) average_query;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE summary_table SET average_value = average_value + 1 WHERE id =
(SELECT id FROM raw_table WHERE value > 100);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- test complex queries
UPDATE summary_table
SET
@ -1108,8 +1106,8 @@ SELECT master_modify_multiple_shards('
SELECT avg(value) AS average FROM raw_table WHERE id = 1
) average_query
WHERE id = 1');
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join
DETAIL: Execute the query without using master_modify_multiple_shards()
-- test connection API via using COPY
-- COPY on SELECT part
BEGIN;

View File

@ -84,8 +84,8 @@ SELECT master_create_worker_shards('temp_nations', 4, 2);
(1 row)
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' ');
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join
DETAIL: Execute the query without using master_modify_multiple_shards()
-- commands with a RETURNING clause are unsupported
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *');
ERROR: master_modify_multiple_shards() does not support RETURNING
@ -222,8 +222,8 @@ SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_key=3
ERROR: modifying the partition value of rows is not allowed
-- UPDATEs with a FROM clause are unsupported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name = ''FAIL'' FROM temp_nations WHERE multi_shard_modify_test.t_key = 3 AND multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''dummy'' ');
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join
DETAIL: Execute the query without using master_modify_multiple_shards()
-- commands with a RETURNING clause are unsupported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *');
ERROR: master_modify_multiple_shards() does not support RETURNING

View File

@ -169,6 +169,10 @@ CREATE TABLE append_stage_table(id int, col_2 int);
INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE append_stage_table_2(id int, col_2 int);
INSERT INTO append_stage_table_2 VALUES(8,3);
INSERT INTO append_stage_table_2 VALUES(9,2);
INSERT INTO append_stage_table_2 VALUES(10,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
create_distributed_table
@ -194,7 +198,7 @@ SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
1440011
(1 row)
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port);
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.00533333
@ -204,15 +208,16 @@ UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
id | col_2
----+-------
1 | 5
3 | 5
5 | 5
8 | 5
9 | 5
10 | 5
1 | 5
3 | 5
5 | 5
(6 rows)
DROP TABLE append_stage_table;
DROP TABLE append_stage_table_2;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
SET citus.multi_shard_modify_mode to 'parallel';
@ -317,43 +322,24 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
(5 rows)
DROP TABLE tt2;
-- Multiple RTEs are not supported
-- Multiple RTEs are only supported if subquery is pushdownable
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 0 | 13 | 0
-- To test colocation between tables in modify query
SET citus.shard_count to 6;
CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table_2', 'user_id');
create_distributed_table
--------------------------
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
\COPY events_test_table_2 FROM STDIN DELIMITER AS ',';
CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int);
\COPY events_test_table_local FROM STDIN DELIMITER AS ',';
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
NOTICE: Copying data from local table...
create_distributed_table
@ -361,11 +347,416 @@ NOTICE: Copying data from local table...
(1 row)
-- We can pushdown query if there is partition key equality
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
DELETE FROM users_test_table
USING events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table
SET value_1 = 3
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM users_test_table
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM events_test_table_2
WHERE now() > (SELECT max(date_col)
FROM test_table_1
WHERE test_table_1.id = events_test_table_2.user_id
GROUP BY id)
RETURNING *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
1 | 5 | 7 | 7
1 | 20 | 12 | 25
1 | 60 | 17 | 17
(3 rows)
UPDATE users_test_table
SET value_1 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id
AND events_test_table.user_id > 5;
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table);
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table) returning *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
20 | 4 | | 0
20 | 4 | | 0
20 | 4 | | 0
4 | 4 | 9 | 0
4 | 4 | 17 | 0
16 | 4 | | 0
6 | 4 | 11 | 0
6 | 4 | 15 | 0
2 | 4 | 7 | 0
2 | 4 | 19 | 0
(11 rows)
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION ALL
SELECT user_id
FROM events_test_table) returning *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
20 | 4 | | 0
20 | 4 | | 0
20 | 4 | | 0
4 | 4 | 9 | 0
4 | 4 | 17 | 0
16 | 4 | | 0
6 | 4 | 11 | 0
6 | 4 | 15 | 0
2 | 4 | 7 | 0
2 | 4 | 19 | 0
(11 rows)
UPDATE users_test_table
SET value_1 = 5
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id
GROUP BY
user_id
);
UPDATE users_test_table
SET value_3 = 1
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id AND
users_test_table.value_2 > events_test_table.value_2
GROUP BY
user_id
);
UPDATE users_test_table
SET value_2 = 4
WHERE
value_1 > 1 AND value_1 < 3
AND value_2 >= 1
AND user_id IN
(
SELECT
e1.user_id
FROM (
SELECT
user_id,
1 AS view_homepage
FROM events_test_table
WHERE
value_1 IN (0, 1)
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo
FROM events_test_table
WHERE
user_id = e1.user_id
) e2 ON true
);
UPDATE users_test_table
SET value_3 = 5
WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id);
-- Test it within transaction
BEGIN;
INSERT INTO users_test_table
SELECT * FROM events_test_table
WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5;
SELECT SUM(value_2) FROM users_test_table;
sum
-----
169
(1 row)
UPDATE users_test_table
SET value_2 = 1
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
SELECT SUM(value_2) FROM users_test_table;
sum
-----
97
(1 row)
COMMIT;
-- Test with schema
CREATE SCHEMA sec_schema;
CREATE TABLE sec_schema.tt1(id int, value_1 int);
SELECT create_distributed_table('sec_schema.tt1','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9);
UPDATE sec_schema.tt1
SET value_1 = 11
WHERE id < (SELECT max(value_2) FROM events_test_table_2
WHERE sec_schema.tt1.id = events_test_table_2.user_id
GROUP BY user_id)
RETURNING *;
id | value_1
----+---------
7 | 11
9 | 11
(2 rows)
DROP SCHEMA sec_schema CASCADE;
NOTICE: drop cascades to table sec_schema.tt1
-- We don't need partition key equality with reference tables
UPDATE events_test_table
SET value_2 = 5
FROM users_reference_copy_table
WHERE users_reference_copy_table.user_id = events_test_table.value_1;
-- Both reference tables and hash distributed tables can be used in subquery
UPDATE events_test_table as ett
SET value_2 = 6
WHERE ett.value_3 IN (SELECT utt.value_3
FROM users_test_table as utt, users_reference_copy_table as uct
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
-- We don't need equality check with constant values in sub-select
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions
UPDATE test_table_1
SET col_3 = 6
WHERE date_col IN (SELECT now());
ERROR: cannot push down this subquery
DETAIL: Subqueries without a FROM clause can only contain immutable functions
-- Test with prepared statements
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count
-------
0
(1 row)
PREPARE foo_plan_2(int,int) AS UPDATE users_test_table
SET value_1 = $1, value_3 = $2
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
EXECUTE foo_plan_2(1,5);
EXECUTE foo_plan_2(3,15);
EXECUTE foo_plan_2(5,25);
EXECUTE foo_plan_2(7,35);
EXECUTE foo_plan_2(9,45);
EXECUTE foo_plan_2(0,0);
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count
-------
6
(1 row)
-- Test with varying WHERE expressions
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id OR FALSE;
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
-- Test with inactive shard-placement
-- manually set shardstate of one placement of users_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
-- manually set shardstate of one placement of events_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
-- Subquery must return single value to use it with comparison operators
UPDATE users_test_table as utt
SET value_1 = 3
WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id);
ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637
-- We can not pushdown a query if the target relation is reference table
UPDATE users_reference_copy_table
SET value_2 = 5
FROM events_test_table
WHERE users_reference_copy_table.user_id = events_test_table.user_id;
ERROR: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables
-- We cannot push down it if the query has outer join and using
UPDATE events_test_table
SET value_2 = users_test_table.user_id
FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
ERROR: a join with USING causes an internal naming conflict, use ON instead
-- We can not pushdown query if there is no partition key equality
UPDATE users_test_table
SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1)
FROM events_test_table
GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = (SELECT Count(*)
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT value_1
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
INTERSECT
SELECT Sum(value_1)
FROM events_test_table
GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_2 = (SELECT value_3
FROM users_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_2 = 2
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id > events_test_table.user_id AND
users_test_table.value_1 = events_test_table.value_1
GROUP BY
user_id
);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET (value_1, value_2) = (2,1)
WHERE user_id IN
(SELECT user_id
FROM users_test_table
INTERSECT
SELECT user_id
FROM events_test_table);
ERROR: cannot push down this subquery
DETAIL: Intersect and Except are currently unsupported
-- Reference tables can not locate on the outer part of the outer join
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN
(SELECT DISTINCT e2.user_id
FROM users_reference_copy_table
LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- Volatile functions are also not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id * random();
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
UPDATE users_test_table
SET value_2 = 5 * random()
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Local tables are not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id;
ERROR: relation events_test_table_local is not distributed
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
ERROR: relation events_test_table_local is not distributed
UPDATE events_test_table_local
SET value_2 = 5
FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id;
ERROR: relation events_test_table_local is not distributed
-- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_2
WHERE users_test_table.user_id = events_test_table_2.user_id;
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Should error out due to multiple row return from subquery, but we can not get this information within
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
DELETE FROM users_test_table
WHERE users_test_table.user_id = (SELECT user_id
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 5
3 | Wed Jan 12 08:35:19 2011 PST | 9
3 | Mon Jan 12 08:35:19 2111 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 7
(3 rows)
@ -374,15 +765,16 @@ SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 3
3 | Wed Jan 12 08:35:19 2011 PST | 3
3 | Mon Jan 12 08:35:19 2111 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 3
(3 rows)
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
SELECT * FROM test_table_1;
id | date_col | col_3
----+----------+-------
(0 rows)
id | date_col | col_3
----+------------------------------+-------
3 | Mon Jan 12 08:35:19 2111 PST | 9
(1 row)
DROP TABLE test_table_1;
-- Volatile functions are not supported

View File

@ -169,6 +169,10 @@ CREATE TABLE append_stage_table(id int, col_2 int);
INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE append_stage_table_2(id int, col_2 int);
INSERT INTO append_stage_table_2 VALUES(8,3);
INSERT INTO append_stage_table_2 VALUES(9,2);
INSERT INTO append_stage_table_2 VALUES(10,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
create_distributed_table
@ -194,7 +198,7 @@ SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
1440011
(1 row)
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port);
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.00533333
@ -204,15 +208,16 @@ UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
id | col_2
----+-------
1 | 5
3 | 5
5 | 5
8 | 5
9 | 5
10 | 5
1 | 5
3 | 5
5 | 5
(6 rows)
DROP TABLE append_stage_table;
DROP TABLE append_stage_table_2;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
SET citus.multi_shard_modify_mode to 'parallel';
@ -340,43 +345,24 @@ UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
(5 rows)
DROP TABLE tt2;
-- Multiple RTEs are not supported
-- Multiple RTEs are only supported if subquery is pushdownable
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 0 | 13 | 0
-- To test colocation between tables in modify query
SET citus.shard_count to 6;
CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table_2', 'user_id');
create_distributed_table
--------------------------
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
\COPY events_test_table_2 FROM STDIN DELIMITER AS ',';
CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int);
\COPY events_test_table_local FROM STDIN DELIMITER AS ',';
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
NOTICE: Copying data from local table...
create_distributed_table
@ -384,11 +370,416 @@ NOTICE: Copying data from local table...
(1 row)
-- We can pushdown query if there is partition key equality
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
DELETE FROM users_test_table
USING events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table
SET value_1 = 3
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM users_test_table
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM events_test_table_2
WHERE now() > (SELECT max(date_col)
FROM test_table_1
WHERE test_table_1.id = events_test_table_2.user_id
GROUP BY id)
RETURNING *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
1 | 5 | 7 | 7
1 | 20 | 12 | 25
1 | 60 | 17 | 17
(3 rows)
UPDATE users_test_table
SET value_1 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id
AND events_test_table.user_id > 5;
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table);
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table) returning *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
20 | 4 | | 0
20 | 4 | | 0
20 | 4 | | 0
4 | 4 | 9 | 0
4 | 4 | 17 | 0
16 | 4 | | 0
6 | 4 | 11 | 0
6 | 4 | 15 | 0
2 | 4 | 7 | 0
2 | 4 | 19 | 0
(11 rows)
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION ALL
SELECT user_id
FROM events_test_table) returning *;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
20 | 4 | | 0
20 | 4 | | 0
20 | 4 | | 0
4 | 4 | 9 | 0
4 | 4 | 17 | 0
16 | 4 | | 0
6 | 4 | 11 | 0
6 | 4 | 15 | 0
2 | 4 | 7 | 0
2 | 4 | 19 | 0
(11 rows)
UPDATE users_test_table
SET value_1 = 5
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id
GROUP BY
user_id
);
UPDATE users_test_table
SET value_3 = 1
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id AND
users_test_table.value_2 > events_test_table.value_2
GROUP BY
user_id
);
UPDATE users_test_table
SET value_2 = 4
WHERE
value_1 > 1 AND value_1 < 3
AND value_2 >= 1
AND user_id IN
(
SELECT
e1.user_id
FROM (
SELECT
user_id,
1 AS view_homepage
FROM events_test_table
WHERE
value_1 IN (0, 1)
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo
FROM events_test_table
WHERE
user_id = e1.user_id
) e2 ON true
);
UPDATE users_test_table
SET value_3 = 5
WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id);
-- Test it within transaction
BEGIN;
INSERT INTO users_test_table
SELECT * FROM events_test_table
WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5;
SELECT SUM(value_2) FROM users_test_table;
sum
-----
169
(1 row)
UPDATE users_test_table
SET value_2 = 1
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
SELECT SUM(value_2) FROM users_test_table;
sum
-----
97
(1 row)
COMMIT;
-- Test with schema
CREATE SCHEMA sec_schema;
CREATE TABLE sec_schema.tt1(id int, value_1 int);
SELECT create_distributed_table('sec_schema.tt1','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9);
UPDATE sec_schema.tt1
SET value_1 = 11
WHERE id < (SELECT max(value_2) FROM events_test_table_2
WHERE sec_schema.tt1.id = events_test_table_2.user_id
GROUP BY user_id)
RETURNING *;
id | value_1
----+---------
7 | 11
9 | 11
(2 rows)
DROP SCHEMA sec_schema CASCADE;
NOTICE: drop cascades to table sec_schema.tt1
-- We don't need partition key equality with reference tables
UPDATE events_test_table
SET value_2 = 5
FROM users_reference_copy_table
WHERE users_reference_copy_table.user_id = events_test_table.value_1;
-- Both reference tables and hash distributed tables can be used in subquery
UPDATE events_test_table as ett
SET value_2 = 6
WHERE ett.value_3 IN (SELECT utt.value_3
FROM users_test_table as utt, users_reference_copy_table as uct
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
-- We don't need equality check with constant values in sub-select
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions
UPDATE test_table_1
SET col_3 = 6
WHERE date_col IN (SELECT now());
ERROR: cannot push down this subquery
DETAIL: Subqueries without a FROM clause can only contain immutable functions
-- Test with prepared statements
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count
-------
0
(1 row)
PREPARE foo_plan_2(int,int) AS UPDATE users_test_table
SET value_1 = $1, value_3 = $2
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
EXECUTE foo_plan_2(1,5);
EXECUTE foo_plan_2(3,15);
EXECUTE foo_plan_2(5,25);
EXECUTE foo_plan_2(7,35);
EXECUTE foo_plan_2(9,45);
EXECUTE foo_plan_2(0,0);
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count
-------
6
(1 row)
-- Test with varying WHERE expressions
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id OR FALSE;
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
-- Test with inactive shard-placement
-- manually set shardstate of one placement of users_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
-- manually set shardstate of one placement of events_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
-- Subquery must return single value to use it with comparison operators
UPDATE users_test_table as utt
SET value_1 = 3
WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id);
ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637
-- We can not pushdown a query if the target relation is reference table
UPDATE users_reference_copy_table
SET value_2 = 5
FROM events_test_table
WHERE users_reference_copy_table.user_id = events_test_table.user_id;
ERROR: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables
-- We cannot push down it if the query has outer join and using
UPDATE events_test_table
SET value_2 = users_test_table.user_id
FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
ERROR: a join with USING causes an internal naming conflict, use ON instead
-- We can not pushdown query if there is no partition key equality
UPDATE users_test_table
SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1)
FROM events_test_table
GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = (SELECT Count(*)
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT value_1
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
INTERSECT
SELECT Sum(value_1)
FROM events_test_table
GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_2 = (SELECT value_3
FROM users_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET value_2 = 2
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id > events_test_table.user_id AND
users_test_table.value_1 = events_test_table.value_1
GROUP BY
user_id
);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table
SET (value_1, value_2) = (2,1)
WHERE user_id IN
(SELECT user_id
FROM users_test_table
INTERSECT
SELECT user_id
FROM events_test_table);
ERROR: cannot push down this subquery
DETAIL: Intersect and Except are currently unsupported
-- Reference tables can not locate on the outer part of the outer join
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN
(SELECT DISTINCT e2.user_id
FROM users_reference_copy_table
LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- Volatile functions are also not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id * random();
ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE
UPDATE users_test_table
SET value_2 = 5 * random()
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Local tables are not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id;
ERROR: relation events_test_table_local is not distributed
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
ERROR: relation events_test_table_local is not distributed
UPDATE events_test_table_local
SET value_2 = 5
FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id;
ERROR: relation events_test_table_local is not distributed
-- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_2
WHERE users_test_table.user_id = events_test_table_2.user_id;
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Should error out due to multiple row return from subquery, but we can not get this information within
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
DELETE FROM users_test_table
WHERE users_test_table.user_id = (SELECT user_id
FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Cursors are not supported
BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;
FETCH test_cursor;
user_id | value_1 | value_2 | value_3
---------+---------+---------+---------
8 | 4 | 13 | 0
(1 row)
UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ERROR: cannot run DML queries with cursors
ROLLBACK;
-- Stable functions are supported
SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 5
3 | Wed Jan 12 08:35:19 2011 PST | 9
3 | Mon Jan 12 08:35:19 2111 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 7
(3 rows)
@ -397,15 +788,16 @@ SELECT * FROM test_table_1;
id | date_col | col_3
----+------------------------------+-------
1 | Sat Apr 05 08:32:12 2014 PDT | 3
3 | Wed Jan 12 08:35:19 2011 PST | 3
3 | Mon Jan 12 08:35:19 2111 PST | 9
2 | Sun Feb 01 08:31:16 2015 PST | 3
(3 rows)
DELETE FROM test_table_1 WHERE date_col < current_timestamp;
SELECT * FROM test_table_1;
id | date_col | col_3
----+----------+-------
(0 rows)
id | date_col | col_3
----+------------------------------+-------
3 | Mon Jan 12 08:35:19 2111 PST | 9
(1 row)
DROP TABLE test_table_1;
-- Volatile functions are not supported

View File

@ -378,6 +378,8 @@ raw_data AS (
DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING *
)
INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id;
ERROR: cannot push down this subquery
DETAIL: Aggregates without group by are currently unsupported when a subquery references a column from another query
INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3);
-- read ids from the same table
WITH distinct_ids AS (
@ -390,7 +392,7 @@ update_data AS (
SELECT count(*) FROM update_data;
count
-------
3
6
(1 row)
-- read ids from a different table
@ -418,7 +420,7 @@ WITH update_data AS (
SELECT COUNT(*) FROM update_data;
count
-------
1
2
(1 row)
WITH delete_rows AS (
@ -427,10 +429,13 @@ WITH delete_rows AS (
SELECT * FROM delete_rows ORDER BY id, val;
id | val
----+-----
11 | 100
12 | 300
13 | 100
21 | 300
22 | 200
23 | 100
(3 rows)
(6 rows)
WITH delete_rows AS (
DELETE FROM summary_table WHERE id > 10 RETURNING *
@ -438,10 +443,7 @@ WITH delete_rows AS (
SELECT * FROM delete_rows ORDER BY id, counter;
id | counter
----+---------
11 | 1
12 | 1
13 | 1
(3 rows)
(0 rows)
-- Check modifiying CTEs inside a transaction
BEGIN;
@ -526,8 +528,11 @@ WITH deleted_rows AS (
DELETE FROM modify_table WHERE id IN (SELECT id FROM modify_table WHERE val = 4) RETURNING *
)
SELECT * FROM deleted_rows;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
id | val
----+-----
2 | 4
(1 row)
WITH select_rows AS (
SELECT id FROM modify_table WHERE val = 4
),
@ -537,15 +542,13 @@ deleted_rows AS (
SELECT * FROM deleted_rows;
id | val
----+-----
2 | 4
(1 row)
(0 rows)
WITH deleted_rows AS (
DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING *
)
SELECT * FROM deleted_rows;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
WITH select_rows AS (
SELECT val FROM modify_table WHERE id = 3
),
@ -562,8 +565,7 @@ WITH deleted_rows AS (
DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING *
)
SELECT * FROM deleted_rows;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
WITH select_rows AS (
SELECT ctid FROM modify_table WHERE id = 1
),
@ -644,8 +646,7 @@ raw_data AS (
RETURNING id, val
)
SELECT * FROM raw_data ORDER BY val;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Test with replication factor 2
SET citus.shard_replication_factor to 2;
DROP TABLE modify_table;

View File

@ -25,6 +25,7 @@ test: isolation_create_restore_point
test: isolation_create_distributed_table isolation_master_append_table isolation_master_apply_delete
test: isolation_multi_shard_modify_vs_all
test: isolation_modify_with_subquery_vs_dml
test: isolation_hash_copy_vs_all
test: isolation_append_copy_vs_all
test: isolation_range_copy_vs_all

View File

@ -0,0 +1,91 @@
setup
{
SET citus.shard_replication_factor to 2;
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
INSERT INTO users_test_table VALUES
(1, 5, 6, 7),
(2, 12, 7, 18),
(3, 23, 8, 25),
(4, 42, 9, 23),
(5, 35, 10, 17),
(6, 21, 11, 25),
(7, 27, 12, 18);
CREATE TABLE events_test_table (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table', 'user_id');
INSERT INTO events_test_table VALUES
(1, 5, 7, 7),
(3, 11, 78, 18),
(5, 22, 9, 25),
(7, 41, 10, 23),
(1, 20, 12, 25),
(3, 26, 13, 18),
(5, 17, 14, 4);
}
teardown
{
DROP TABLE users_test_table;
DROP TABLE events_test_table;
SET citus.shard_replication_factor to 1;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-insert_to_events_test_table"
{
INSERT INTO events_test_table VALUES(4,6,8,10);
}
step "s1-update_events_test_table"
{
UPDATE users_test_table SET value_1 = 3;
}
step "s1-delete_events_test_table"
{
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-modify_with_subquery_v1"
{
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
}
step "s2-modify_with_subquery_v2"
{
UPDATE users_test_table SET value_1 = 3 WHERE user_id IN (SELECT user_id FROM events_test_table);
}
step "s2-commit"
{
COMMIT;
}
# tests to check locks on subqueries are taken
permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-insert_to_events_test_table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-update_events_test_table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s2-modify_with_subquery_v1" "s1-delete_events_test_table" "s2-commit" "s1-commit"
permutation "s1-begin" "s2-begin" "s1-insert_to_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-update_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit"
permutation "s1-begin" "s2-begin" "s1-delete_events_test_table" "s2-modify_with_subquery_v1" "s1-commit" "s2-commit"

View File

@ -392,9 +392,23 @@ EXPLAIN (COSTS FALSE)
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part;
SET citus.explain_all_tasks TO off;
-- Test update with subquery
EXPLAIN (COSTS FALSE)
UPDATE lineitem_hash_part
SET l_suppkey = 12
FROM orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
-- Test delete with subquery
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_hash_part
USING orders_hash_part
WHERE orders_hash_part.o_orderkey = lineitem_hash_part.l_orderkey;
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;

View File

@ -135,16 +135,22 @@ INSERT INTO append_stage_table VALUES(1,3);
INSERT INTO append_stage_table VALUES(3,2);
INSERT INTO append_stage_table VALUES(5,4);
CREATE TABLE append_stage_table_2(id int, col_2 int);
INSERT INTO append_stage_table_2 VALUES(8,3);
INSERT INTO append_stage_table_2 VALUES(9,2);
INSERT INTO append_stage_table_2 VALUES(10,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
SELECT master_create_empty_shard('test_append_table');
SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port);
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table', 'localhost', :master_port);
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table;
DROP TABLE append_stage_table;
DROP TABLE append_stage_table_2;
DROP TABLE test_append_table;
-- Update multi shard of partitioned distributed table
@ -203,15 +209,407 @@ SET citus.multi_shard_modify_mode to sequential;
UPDATE tt2 SET col_2 = 3 RETURNING id, col_2;
DROP TABLE tt2;
-- Multiple RTEs are not supported
-- Multiple RTEs are only supported if subquery is pushdownable
SET citus.multi_shard_modify_mode to DEFAULT;
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table SET value_2 = (SELECT value_3 FROM users_test_table);
UPDATE users_test_table SET value_2 = (SELECT value_2 FROM events_test_table);
DELETE FROM users_test_table USING events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT user_id FROM events_test_table);
DELETE FROM users_test_table WHERE users_test_table.user_id = (SELECT value_1 FROM users_test_table);
-- To test colocation between tables in modify query
SET citus.shard_count to 6;
CREATE TABLE events_test_table_2 (user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('events_test_table_2', 'user_id');
\COPY events_test_table_2 FROM STDIN DELIMITER AS ',';
1, 5, 7, 7
3, 11, 78, 18
5, 22, 9, 25
7, 41, 10, 23
9, 34, 11, 21
1, 20, 12, 25
3, 26, 13, 18
5, 17, 14, 4
7, 37, 15, 22
9, 42, 16, 22
1, 60, 17, 17
3, 5, 18, 8
5, 15, 19, 44
7, 24, 20, 38
9, 54, 21, 17
\.
CREATE TABLE events_test_table_local (user_id int, value_1 int, value_2 int, value_3 int);
\COPY events_test_table_local FROM STDIN DELIMITER AS ',';
1, 5, 7, 7
3, 11, 78, 18
5, 22, 9, 25
7, 41, 10, 23
9, 34, 11, 21
1, 20, 12, 25
3, 26, 13, 18
5, 17, 14, 4
7, 37, 15, 22
9, 42, 16, 22
1, 60, 17, 17
3, 5, 18, 8
5, 15, 19, 44
7, 24, 20, 38
9, 54, 21, 17
\.
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2111-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
-- We can pushdown query if there is partition key equality
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
DELETE FROM users_test_table
USING events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table
SET value_1 = 3
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM users_test_table
WHERE user_id IN (SELECT user_id
FROM events_test_table);
DELETE FROM events_test_table_2
WHERE now() > (SELECT max(date_col)
FROM test_table_1
WHERE test_table_1.id = events_test_table_2.user_id
GROUP BY id)
RETURNING *;
UPDATE users_test_table
SET value_1 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id
AND events_test_table.user_id > 5;
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table);
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT user_id
FROM events_test_table) returning *;
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION ALL
SELECT user_id
FROM events_test_table) returning *;
UPDATE users_test_table
SET value_1 = 5
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id
GROUP BY
user_id
);
UPDATE users_test_table
SET value_3 = 1
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id = events_test_table.user_id AND
users_test_table.value_2 > events_test_table.value_2
GROUP BY
user_id
);
UPDATE users_test_table
SET value_2 = 4
WHERE
value_1 > 1 AND value_1 < 3
AND value_2 >= 1
AND user_id IN
(
SELECT
e1.user_id
FROM (
SELECT
user_id,
1 AS view_homepage
FROM events_test_table
WHERE
value_1 IN (0, 1)
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo
FROM events_test_table
WHERE
user_id = e1.user_id
) e2 ON true
);
UPDATE users_test_table
SET value_3 = 5
WHERE value_2 IN (SELECT AVG(value_1) OVER (PARTITION BY user_id) FROM events_test_table WHERE events_test_table.user_id = users_test_table.user_id);
-- Test it within transaction
BEGIN;
INSERT INTO users_test_table
SELECT * FROM events_test_table
WHERE events_test_table.user_id = 1 OR events_test_table.user_id = 5;
SELECT SUM(value_2) FROM users_test_table;
UPDATE users_test_table
SET value_2 = 1
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
SELECT SUM(value_2) FROM users_test_table;
COMMIT;
-- Test with schema
CREATE SCHEMA sec_schema;
CREATE TABLE sec_schema.tt1(id int, value_1 int);
SELECT create_distributed_table('sec_schema.tt1','id');
INSERT INTO sec_schema.tt1 values(1,1),(2,2),(7,7),(9,9);
UPDATE sec_schema.tt1
SET value_1 = 11
WHERE id < (SELECT max(value_2) FROM events_test_table_2
WHERE sec_schema.tt1.id = events_test_table_2.user_id
GROUP BY user_id)
RETURNING *;
DROP SCHEMA sec_schema CASCADE;
-- We don't need partition key equality with reference tables
UPDATE events_test_table
SET value_2 = 5
FROM users_reference_copy_table
WHERE users_reference_copy_table.user_id = events_test_table.value_1;
-- Both reference tables and hash distributed tables can be used in subquery
UPDATE events_test_table as ett
SET value_2 = 6
WHERE ett.value_3 IN (SELECT utt.value_3
FROM users_test_table as utt, users_reference_copy_table as uct
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
-- We don't need equality check with constant values in sub-select
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_reference_copy_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE user_id IN (SELECT 2);
UPDATE users_test_table
SET value_2 = 6
WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions
UPDATE test_table_1
SET col_3 = 6
WHERE date_col IN (SELECT now());
-- Test with prepared statements
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
PREPARE foo_plan_2(int,int) AS UPDATE users_test_table
SET value_1 = $1, value_3 = $2
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
EXECUTE foo_plan_2(1,5);
EXECUTE foo_plan_2(3,15);
EXECUTE foo_plan_2(5,25);
EXECUTE foo_plan_2(7,35);
EXECUTE foo_plan_2(9,45);
EXECUTE foo_plan_2(0,0);
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
-- Test with varying WHERE expressions
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id OR FALSE;
UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
-- Test with inactive shard-placement
-- manually set shardstate of one placement of users_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
-- manually set shardstate of one placement of events_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
-- Subquery must return single value to use it with comparison operators
UPDATE users_test_table as utt
SET value_1 = 3
WHERE value_2 > (SELECT value_3 FROM events_test_table as ett WHERE utt.user_id = ett.user_id);
-- We can not pushdown a query if the target relation is reference table
UPDATE users_reference_copy_table
SET value_2 = 5
FROM events_test_table
WHERE users_reference_copy_table.user_id = events_test_table.user_id;
-- We cannot push down it if the query has outer join and using
UPDATE events_test_table
SET value_2 = users_test_table.user_id
FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
-- We can not pushdown query if there is no partition key equality
UPDATE users_test_table
SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1)
FROM events_test_table
GROUP BY user_id);
UPDATE users_test_table
SET value_1 = (SELECT Count(*)
FROM events_test_table);
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
UNION
SELECT value_1
FROM events_test_table);
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN (SELECT user_id
FROM users_test_table
INTERSECT
SELECT Sum(value_1)
FROM events_test_table
GROUP BY user_id);
UPDATE users_test_table
SET value_2 = (SELECT value_3
FROM users_test_table);
UPDATE users_test_table
SET value_2 = 2
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_test_table
WHERE
users_test_table.user_id > events_test_table.user_id AND
users_test_table.value_1 = events_test_table.value_1
GROUP BY
user_id
);
UPDATE users_test_table
SET (value_1, value_2) = (2,1)
WHERE user_id IN
(SELECT user_id
FROM users_test_table
INTERSECT
SELECT user_id
FROM events_test_table);
-- Reference tables can not locate on the outer part of the outer join
UPDATE users_test_table
SET value_1 = 4
WHERE user_id IN
(SELECT DISTINCT e2.user_id
FROM users_reference_copy_table
LEFT JOIN users_test_table e2 ON (e2.user_id = users_reference_copy_table.value_1)) RETURNING *;
-- Volatile functions are also not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id * random();
UPDATE users_test_table
SET value_2 = 5 * random()
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
-- Local tables are not supported
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id;
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
UPDATE events_test_table_local
SET value_2 = 5
FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id;
-- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_2
WHERE users_test_table.user_id = events_test_table_2.user_id;
-- Should error out due to multiple row return from subquery, but we can not get this information within
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
DELETE FROM users_test_table
WHERE users_test_table.user_id = (SELECT user_id
FROM events_test_table);
-- Cursors are not supported
BEGIN;
@ -221,12 +619,6 @@ UPDATE users_test_table SET value_2 = 5 WHERE CURRENT OF test_cursor;
ROLLBACK;
-- Stable functions are supported
CREATE TABLE test_table_1(id int, date_col timestamptz, col_3 int);
INSERT INTO test_table_1 VALUES(1, '2014-04-05 08:32:12', 5);
INSERT INTO test_table_1 VALUES(2, '2015-02-01 08:31:16', 7);
INSERT INTO test_table_1 VALUES(3, '2011-01-12 08:35:19', 9);
SELECT create_distributed_table('test_table_1', 'id');
SELECT * FROM test_table_1;
UPDATE test_table_1 SET col_3 = 3 WHERE date_col < now();
SELECT * FROM test_table_1;

View File

@ -221,6 +221,7 @@ SELECT * FROM modify_table ORDER BY id, val;
SELECT * FROM anchor_table ORDER BY id;
INSERT INTO modify_table VALUES (11, 1), (12, 2), (13, 3);
WITH select_data AS (
SELECT * FROM modify_table
),