Feature: INSERT INTO ... SELECT

This commit adds INSERT INTO ... SELECT feature for distributed tables.

We implement INSERT INTO ... SELECT by pushing down the SELECT to
each shard. To compute that we use the router planner, by adding
an "uninstantiated" constraint that the partition column be equal to a
certain value. standard_planner() distributes that constraint to all
the tables where it knows how to push the restriction safely. An example
is that the tables that are connected via equi joins.

The router planner then iterates over the target table's shards,
for each we replace the "uninstantiated" restriction, with one that
PruneShardList() handles. Do so by replacing the partitioning qual
parameter added in multi_planner() with the current shard's
actual boundary values. Also, add the current shard's boundary values to the
top level subquery to ensure that even if the partitioning qual is
not distributed to all the tables, we never run the queries on the shards
that don't match with the current shard boundaries. Finally, perform the
normal shard pruning to decide on whether to push the query to the
current shard or not.

We do not support certain SQLs on the subquery, which are described/commented
on ErrorIfInsertSelectQueryNotSupported().

We also added some locking on the router executor. When an INSERT/SELECT command
runs on a distributed table with replication factor >1, we need to ensure that
it sees the same result on each placement of a shard. So we added the ability
such that router executor takes exclusive locks on shards from which the SELECT
in an INSERT/SELECT reads in order to prevent concurrent changes. This is not a
very optimal solution, but it's simple and correct. The
citus.all_modifications_commutative can be used to avoid aggressive locking.
An INSERT/SELECT whose filters are known to exclude any ongoing writes can be
marked as commutative. See RequiresConsistentSnapshot() for the details.

We also moved the decison of whether the multiPlan should be executed on
the router executor or not to the planning phase. This allowed us to
integrate multi task router executor tasks to the router executor smoothly.
pull/859/head
Onder Kalaci 2016-10-04 11:27:53 +03:00
parent e0d83d65af
commit 1673ea937c
22 changed files with 3413 additions and 216 deletions

View File

@ -57,23 +57,18 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
executorType = JobExecutorType(multiPlan);
if (executorType == MULTI_EXECUTOR_ROUTER)
{
Task *task = NULL;
List *taskList = workerJob->taskList;
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
planStatement->planTree->targetlist, false);
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
/* router executor can only execute distributed plans with a single task */
Assert(list_length(taskList) == 1);
/* router executor cannot execute task with depencencies */
Assert(dependendJobList == NIL);
task = (Task *) linitial(taskList);
/* we need to set tupleDesc in executorStart */
queryDesc->tupDesc = tupleDescriptor;
/* drop into the router executor */
RouterExecutorStart(queryDesc, eflags, task);
RouterExecutorStart(queryDesc, eflags);
}
else
{

View File

@ -100,8 +100,8 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
TupleDesc tupleDescriptor);
static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *shardIntervalList);
static bool IsReplicated(List *shardIntervalList);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination,
Tuplestorestate *tupleStore);
@ -133,14 +133,11 @@ static void MarkRemainingInactivePlacements(void);
* execution.
*/
void
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
RouterExecutorStart(QueryDesc *queryDesc, int eflags)
{
EState *executorState = NULL;
CmdType commandType = queryDesc->operation;
/* ensure that the task is not NULL */
Assert(task != NULL);
/* disallow triggers during distributed modify commands */
if (commandType != CMD_SELECT)
{
@ -314,15 +311,38 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
{
LockShardResource(shardId, lockMode);
}
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT.. SELECT
* commands from having a different effect on different placements.
*/
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockShardListResources(task->selectShardList, ExclusiveLock);
}
}
/*
* AcquireExecutorMultiShardLocks acquires shard locks need for execution
* of writes on multiple shards.
* AcquireExecutorMultiShardLocks acquires shard locks needed for execution
* of writes on multiple shards. In addition to honouring commutativity
* rules, we currently only allow a single multi-shard command on a shard at
* a time. Otherwise, concurrent multi-shard commands may take row-level
* locks on the shard placements in a different order and create a distributed
* deadlock. This applies even when writes are commutative and/or there is
* no replication.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareUpdateExclusiveLock.
*
* 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock.
@ -330,65 +350,121 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with ShareLock,
* which is normally obtained by single-shard commutative writes.
* in the same order on all placements. It does not conflict with
* RowExclusiveLock, which is normally obtained by single-shard, commutative
* writes.
*/
static void
AcquireExecutorMultiShardLocks(List *shardIntervalList)
AcquireExecutorMultiShardLocks(List *taskList)
{
LOCKMODE lockMode = NoLock;
ListCell *taskCell = NULL;
if (AllModificationsCommutative || !IsReplicated(shardIntervalList))
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
LOCKMODE lockMode = NoLock;
if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1)
{
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is no replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
LockShardResource(task->anchorShardId, lockMode);
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is not replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT..SELECT
* commands from having different effects on different placements.
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockShardListResources(shardIntervalList, lockMode);
LockShardListResources(task->selectShardList, ExclusiveLock);
}
}
}
/*
* IsReplicated checks whether any of the shards in the given list has more
* than one replica.
* RequiresConsistentSnapshot returns true if the given task need to take
* the necessary locks to ensure that a subquery in the INSERT ... SELECT
* query returns the same output for all task placements.
*/
static bool
IsReplicated(List *shardIntervalList)
RequiresConsistentSnapshot(Task *task)
{
ListCell *shardIntervalCell;
bool hasReplication = false;
bool requiresIsolation = false;
foreach(shardIntervalCell, shardIntervalList)
if (!task->insertSelectQuery)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList->length > 1)
{
hasReplication = true;
break;
}
/*
* Only INSERT/SELECT commands currently require SELECT isolation.
* Other commands do not read from other shards.
*/
requiresIsolation = false;
}
else if (list_length(task->taskPlacementList) == 1)
{
/*
* If there is only one replica then we fully rely on PostgreSQL to
* provide SELECT isolation. In this case, we do not provide isolation
* across the shards, but that was never our intention.
*/
requiresIsolation = false;
}
else if (AllModificationsCommutative)
{
/*
* An INSERT/SELECT is commutative with other writes if it excludes
* any ongoing writes based on the filter conditions. Without knowing
* whether this is true, we assume the user took this into account
* when enabling citus.all_modifications_commutative. This option
* gives users an escape from aggressive locking during INSERT/SELECT.
*/
requiresIsolation = false;
}
else
{
/*
* If this is a non-commutative write, then we need to block ongoing
* writes to make sure that the subselect returns the same result
* on all placements.
*/
requiresIsolation = true;
}
return hasReplication;
return requiresIsolation;
}
@ -812,7 +888,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
AcquireExecutorMultiShardLocks(shardIntervalList);
AcquireExecutorMultiShardLocks(taskList);
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);

View File

@ -49,7 +49,7 @@ JobExecutorType(MultiPlan *multiPlan)
double tasksPerNode = taskCount / ((double) workerNodeCount);
int dependedJobCount = list_length(job->dependedJobList);
MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = RouterExecutablePlan(multiPlan, executorType);
bool routerExecutablePlan = multiPlan->routerExecutable;
/* check if can switch to router executor */
if (routerExecutablePlan)
@ -109,78 +109,6 @@ JobExecutorType(MultiPlan *multiPlan)
}
/*
* RouterExecutablePlan returns whether a multi-plan can be executed using the
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{
Job *job = multiPlan->workerJob;
TaskType taskType = TASK_TYPE_INVALID_FIRST;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
Task *workerTask = NULL;
List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false;
/* router executor cannot execute queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* check if the first task is a modify or a router task, short-circuit if so */
workerTask = (Task *) linitial(workerTaskList);
taskType = workerTask->taskType;
if (taskType == MODIFY_TASK || taskType == ROUTER_TASK)
{
return true;
}
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/* router executor cannot execute queries with dependent data fetch tasks */
workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0)
{
return false;
}
/* router executor cannot execute queries with order by */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/*
* Router executor cannot execute queries with aggregates.
* Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
return true;
}
/*
* MaxMasterConnectionCount returns the number of connections a master can open.
* A master cannot create more than a certain number of file descriptors (FDs).

View File

@ -228,7 +228,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
es->indent += 1;
}
routerExecutablePlan = RouterExecutablePlan(multiPlan, TaskExecutorType);
routerExecutablePlan = multiPlan->routerExecutable;
if (routerExecutablePlan)
{

View File

@ -152,7 +152,6 @@ static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHa
static void ErrorIfUnsupportedTableCombination(Query *queryTree);
static void ErrorIfUnsupportedUnionQuery(Query *unionQuery);
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
static bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList);
static Query * LateralQuery(Query *query);
@ -3318,7 +3317,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
* Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column.
*/
static bool
bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
{
bool isPartitionColumn = false;
@ -3863,7 +3862,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
/*
* RelationIdList returns list of unique relation ids in query tree.
*/
List *
static List *
RelationIdList(Query *query)
{
List *rangeTableList = NIL;

View File

@ -36,6 +36,7 @@
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/shardinterval_utils.h"
@ -115,7 +116,8 @@ static uint32 HashPartitionCount(void);
static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
uint32 shardIntervalCount);
/* Local functions forward declarations for task list creation */
/* Local functions forward declarations for task list creation and helper functions */
static bool MultiPlanRouterExecutable(MultiPlan *multiPlan);
static Job * BuildJobTreeTaskList(Job *jobTree);
static List * SubquerySqlTaskList(Job *job);
static List * SqlTaskList(Job *job);
@ -130,8 +132,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static Var * MakeInt4Column(void);
static Const * MakeInt4Constant(Datum constantValue);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
static List * BuildRestrictInfoList(List *qualList);
static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
@ -150,8 +150,6 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment
static StringInfo NodeNameArrayString(List *workerNodeList);
static StringInfo NodePortArrayString(List *workerNodeList);
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
RangeTableFragment *fragment);
@ -222,11 +220,73 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree)
multiPlan->workerJob = workerJob;
multiPlan->masterQuery = masterQuery;
multiPlan->masterTableName = jobSchemaName->data;
multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan);
return multiPlan;
}
/*
* MultiPlanRouterExecutable returns true if the input multiPlan is
* router executable.
*
* Note that all the multi plans that are created by router planner are
* already router executable. Thus, this function should only be called
* for multi plans that are not generated by router planner.
*/
static bool
MultiPlanRouterExecutable(MultiPlan *multiPlan)
{
Query *masterQuery = multiPlan->masterQuery;
Job *job = multiPlan->workerJob;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
bool masterQueryHasAggregates = false;
/* router executor cannot execute SELECT queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/*
* Router executor does not run master query. This means that aggregation and
* sorting on the master query wouldn't be executed. Thus, such plans shouldn't be
* qualified as router executable.
*/
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/*
* Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
/* FIXME: I tend to think it's time to remove this */
if (TaskExecutorType != MULTI_EXECUTOR_REAL_TIME)
{
return false;
}
return true;
}
/*
* BuildJobTree builds the physical job tree from the given logical plan tree.
* The function walks over the logical plan from the bottom up, finds boundaries
@ -3017,7 +3077,7 @@ MakeHashedOperatorExpression(OpExpr *operatorExpression)
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
*/
static Var *
Var *
MakeInt4Column()
{
Index tableId = 0;
@ -3037,7 +3097,7 @@ MakeInt4Column()
* MakeInt4Constant creates a new constant of int4 type and assigns the given
* value as a constant value.
*/
static Const *
Const *
MakeInt4Constant(Datum constantValue)
{
Oid constantType = INT4OID;
@ -3939,7 +3999,7 @@ DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId)
* CreateBasicTask creates a task, initializes fields that are common to each task,
* and returns the created task.
*/
static Task *
Task *
CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString)
{
Task *task = CitusMakeNode(Task);

View File

@ -59,6 +59,27 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning)
{
originalQuery = copyObject(parse);
/*
* We implement INSERT INTO .. SELECT by pushing down the SELECT to
* each shard. To compute that we use the router planner, by adding
* an "uninstantiated" constraint that the partition column be equal to a
* certain value. standard_planner() distributes that constraint to
* the baserestrictinfos to all the tables where it knows how to push
* the restriction safely. An example is that the tables that are
* connected via equi joins.
*
* The router planner then iterates over the target table's shards,
* for each we replace the "uninstantiated" restriction, with one that
* PruneShardList() handles, and then generate a query for that
* individual shard. If any of the involved tables don't prune down
* to a single shard, or if the pruned shards aren't colocated,
* we error out.
*/
if (InsertSelectQuery(parse))
{
AddUninstantiatedPartitionRestriction(parse);
}
}
/* create a restriction context and put it at the end if context list */

File diff suppressed because it is too large Load Diff

View File

@ -59,7 +59,16 @@ deparse_shard_query_test(PG_FUNCTION_ARGS)
Query *query = lfirst(queryTreeCell);
StringInfo buffer = makeStringInfo();
ReorderInsertSelectTargetListsIfExists(query);
/* reoreder the target list only for INSERT .. SELECT queries */
if (InsertSelectQuery(query))
{
RangeTblEntry *insertRte = linitial(query->rtable);
RangeTblEntry *subqueryRte = lsecond(query->rtable);
ReorderInsertSelectTargetLists(query, insertRte, subqueryRte);
}
deparse_shard_query(query, InvalidOid, 0, buffer);
elog(INFO, "query: %s", buffer->data);

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* le * test/src/test_helper_functions.c
* test/src/test_helper_functions.c
*
* This file contains helper functions used in many Citus tests.
*

View File

@ -277,6 +277,7 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery);
WRITE_STRING_FIELD(masterTableName);
WRITE_BOOL_FIELD(routerExecutable);
}
@ -493,6 +494,8 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(selectShardList);
}
#if (PG_VERSION_NUM < 90600)

View File

@ -184,6 +184,7 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_BOOL_FIELD(routerExecutable);
READ_DONE();
}
@ -289,6 +290,8 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(selectShardList);
READ_DONE();
}

View File

@ -122,6 +122,7 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -169,6 +169,9 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
bool insertSelectQuery;
List *selectShardList; /* only applies INSERT/SELECT tasks */
} Task;
@ -205,6 +208,7 @@ typedef struct MultiPlan
Job *workerJob;
Query *masterQuery;
char *masterTableName;
bool routerExecutable;
} MultiPlan;
@ -227,6 +231,8 @@ extern int TaskAssignmentPolicy;
/* Function declarations for building physical plans and constructing queries */
extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree);
extern StringInfo ShardFetchQueryString(uint64 shardId);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
/* Function declarations for shard pruning */
extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
@ -243,9 +249,10 @@ extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval)
extern bool SimpleOpExpression(Expr *clause);
extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn);
/* helper functions */
extern Var * MakeInt4Column(void);
extern Const * MakeInt4Constant(Datum constantValue);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
/* Function declarations for sorting shards. */
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);

View File

@ -33,7 +33,7 @@ typedef struct XactShardConnSet
extern bool AllModificationsCommutative;
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc);

View File

@ -21,14 +21,21 @@
#include "nodes/parsenodes.h"
/* reserved parameted id, we chose a negative number since it is not assigned by postgres */
#define UNINSTANTIATED_PARAMETER_ID INT_MIN
/* reserved alias name for UPSERTs */
#define UPSERT_ALIAS "citus_table_alias"
#define CITUS_TABLE_ALIAS "citus_table_alias"
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
extern void AddUninstantiatedPartitionRestriction(Query *originalQuery);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte);
extern bool InsertSelectQuery(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -191,7 +191,6 @@ extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */
extern bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan);
extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);

File diff suppressed because it is too large Load Diff

View File

@ -205,10 +205,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders;
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
-- INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;

View File

@ -29,6 +29,8 @@ test: multi_create_table_constraints
test: multi_master_protocol
test: multi_load_data
test: multi_insert_select
# ----------
# Miscellaneous tests to check our query planning behavior
# ----------

View File

@ -0,0 +1,608 @@
--
-- MULTI_INSERT_SELECT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000;
-- create co-located tables
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
SELECT create_distributed_table('raw_events_first', 'user_id');
CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
SELECT create_distributed_table('raw_events_second', 'user_id');
CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg));
SELECT create_distributed_table('agg_events', 'user_id');;
-- create the reference table as well
CREATE TABLE reference_table (user_id int);
SELECT create_reference_table('reference_table');
-- set back to the defaults
SET citus.shard_count = DEFAULT;
SET citus.shard_replication_factor = DEFAULT;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(1, now(), 10, 100, 1000.1, 10000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(2, now(), 20, 200, 2000.1, 20000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(3, now(), 30, 300, 3000.1, 30000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(4, now(), 40, 400, 4000.1, 40000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(5, now(), 50, 500, 5000.1, 50000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(6, now(), 60, 600, 6000.1, 60000);
SET client_min_messages TO DEBUG4;
-- raw table to raw table
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
-- see that our first multi shard INSERT...SELECT works expected
SET client_min_messages TO INFO;
SELECT
raw_events_first.user_id
FROM
raw_events_first, raw_events_second
WHERE
raw_events_first.user_id = raw_events_second.user_id;
-- see that we get unique vialitons
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
-- try a single shard query
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7;
SET client_min_messages TO INFO;
-- add one more row
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(8, now(), 80, 800, 8000, 80000);
-- reorder columns
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
user_id = 8;
-- a zero shard select
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
false;
-- another zero shard select
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
0 != 0;
-- add one more row
SET client_min_messages TO INFO;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(9, now(), 90, 900, 9000, 90000);
-- show that RETURNING also works
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (user_id, value_1, value_3)
SELECT
user_id, value_1, value_3
FROM
raw_events_first
WHERE
value_3 = 9000
RETURNING *;
-- hits two shards
INSERT INTO raw_events_second (user_id, value_1, value_3)
SELECT
user_id, value_1, value_3
FROM
raw_events_first
WHERE
user_id = 9 OR user_id = 16
RETURNING *;
-- now do some aggregations
INSERT INTO agg_events
SELECT
user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4)
FROM
raw_events_first
GROUP BY
user_id;
-- group by column not exists on the SELECT target list
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
SELECT
sum(value_3), count(value_4), sum(value_1), user_id
FROM
raw_events_first
GROUP BY
value_2, user_id
RETURNING *;
-- some subquery tests
INSERT INTO agg_events
(value_1_agg,
user_id)
SELECT SUM(value_1),
id
FROM (SELECT raw_events_second.user_id AS id,
raw_events_second.value_1
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo
GROUP BY id;
-- subquery one more level depth
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo;
-- join between subqueries
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- add one more level subqueris on top of subquery JOINs
INSERT INTO agg_events
(user_id, value_4_agg)
SELECT
outer_most.id, max(outer_most.value)
FROM
(
SELECT f2.id as id, f2.v4 as value FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id)) as outer_most
GROUP BY
outer_most.id;
-- subqueries in WHERE clause
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE user_id = 2);
-- some UPSERTS
INSERT INTO agg_events AS ae
(
user_id,
value_1_agg,
agg_time
)
SELECT user_id,
value_1,
time
FROM raw_events_first
ON conflict (user_id, value_1_agg)
DO UPDATE
SET agg_time = EXCLUDED.agg_time
WHERE ae.agg_time < EXCLUDED.agg_time;
-- upserts with returning
INSERT INTO agg_events AS ae
(
user_id,
value_1_agg,
agg_time
)
SELECT user_id,
value_1,
time
FROM raw_events_first
ON conflict (user_id, value_1_agg)
DO UPDATE
SET agg_time = EXCLUDED.agg_time
WHERE ae.agg_time < EXCLUDED.agg_time
RETURNING user_id, value_1_agg;
INSERT INTO agg_events (user_id, value_1_agg)
SELECT
user_id, sum(value_1 + value_2)
FROM
raw_events_first GROUP BY user_id;
-- FILTER CLAUSE
INSERT INTO agg_events (user_id, value_1_agg)
SELECT
user_id, sum(value_1 + value_2) FILTER (where value_3 = 15)
FROM
raw_events_first GROUP BY user_id;
-- a test with reference table JOINs
INSERT INTO
agg_events (user_id, value_1_agg)
SELECT
raw_events_first.user_id, sum(value_1)
FROM
reference_table, raw_events_first
WHERE
raw_events_first.user_id = reference_table.user_id
GROUP BY
raw_events_first.user_id;
-- a note on the outer joins is that
-- we filter out outer join results
-- where partition column returns
-- NULL. Thus, we could INSERT less rows
-- than we expect from subquery result.
-- see the following tests
SET client_min_messages TO INFO;
-- we don't want to see constraint vialotions, so truncate first
TRUNCATE agg_events;
-- add a row to first table to make table contents different
INSERT INTO raw_events_second (user_id, time, value_1, value_2, value_3, value_4) VALUES
(10, now(), 100, 10000, 10000, 100000);
DELETE FROM raw_events_second WHERE user_id = 2;
-- we select 11 rows
SELECT t1.user_id AS col1,
t2.user_id AS col2
FROM raw_events_first t1
FULL JOIN raw_events_second t2
ON t1.user_id = t2.user_id
ORDER BY t1.user_id,
t2.user_id;
SET client_min_messages TO DEBUG4;
-- we insert 10 rows since we filtered out
-- NULL partition column values
INSERT INTO agg_events (user_id, value_1_agg)
SELECT t1.user_id AS col1,
t2.user_id AS col2
FROM raw_events_first t1
FULL JOIN raw_events_second t2
ON t1.user_id = t2.user_id;
SET client_min_messages TO INFO;
-- see that the results are different from the SELECT query
SELECT
user_id, value_1_agg
FROM
agg_events
ORDER BY
user_id, value_1_agg;
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
TRUNCATE agg_events;
SET client_min_messages TO DEBUG4;
-- DISTINCT clause
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT value_1, user_id
FROM
raw_events_first;
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG4;
-- we do not support DISTINCT ON clauses
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (value_1) value_1, user_id
FROM
raw_events_first;
-- We do not support some CTEs
WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)
INSERT INTO agg_events
(value_1_agg, user_id)
SELECT
v1_agg, user_id
FROM
fist_table_agg;
-- We do support some CTEs
INSERT INTO agg_events
WITH sub_cte AS (SELECT 1)
SELECT
raw_events_first.user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) INTERSECT
(SELECT user_id FROM raw_events_first);
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
-- unsupported JOIN
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id != raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo;
-- INSERT partition column does not match with SELECT partition column
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.value_3 AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_3) AS foo;
-- error cases
-- no part column at all
INSERT INTO raw_events_second
(value_1)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(value_1)
SELECT user_id
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id * 2
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id :: bigint
FROM raw_events_first;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
value_2
FROM raw_events_first
GROUP BY user_id,
value_2;
-- tables should be co-located
INSERT INTO agg_events (user_id)
SELECT
user_id
FROM
reference_table;
-- unsupported joins between subqueries
-- we do not return bare partition column on the inner query
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second.value_1 AS v1,
SUM(raw_events_second.user_id) AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- the second part of the query is not routable since
-- no GROUP BY on the partition column
INSERT INTO agg_events
(user_id)
SELECT f.id FROM
(SELECT
id
FROM (SELECT raw_events_first.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second.value_1 AS v1,
SUM(raw_events_second.user_id) AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- cannot pushdown the query since the JOIN is not equi JOIN
INSERT INTO agg_events
(user_id, value_4_agg)
SELECT
outer_most.id, max(outer_most.value)
FROM
(
SELECT f2.id as id, f2.v4 as value FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id;
-- we currently not support grouping sets
INSERT INTO agg_events
(user_id,
value_1_agg,
value_2_agg)
SELECT user_id,
Sum(value_1) AS sum_val1,
Sum(value_2) AS sum_val2
FROM raw_events_second
GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) );
-- set back to INFO
SET client_min_messages TO INFO;
-- Views does not work
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_second SELECT * FROM test_view;

View File

@ -150,8 +150,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
-- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders;
-- INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)