Merge pull request #859 from citusdata/feature/insert_select

Feature/insert select
pull/914/head
Önder Kalacı 2016-10-26 11:27:16 +03:00 committed by GitHub
commit fa8d39ec91
23 changed files with 4372 additions and 183 deletions

View File

@ -57,23 +57,18 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
executorType = JobExecutorType(multiPlan); executorType = JobExecutorType(multiPlan);
if (executorType == MULTI_EXECUTOR_ROUTER) if (executorType == MULTI_EXECUTOR_ROUTER)
{ {
Task *task = NULL;
List *taskList = workerJob->taskList;
TupleDesc tupleDescriptor = ExecCleanTypeFromTL( TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
planStatement->planTree->targetlist, false); planStatement->planTree->targetlist, false);
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList; List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
/* router executor can only execute distributed plans with a single task */ /* router executor cannot execute task with depencencies */
Assert(list_length(taskList) == 1);
Assert(dependendJobList == NIL); Assert(dependendJobList == NIL);
task = (Task *) linitial(taskList);
/* we need to set tupleDesc in executorStart */ /* we need to set tupleDesc in executorStart */
queryDesc->tupDesc = tupleDescriptor; queryDesc->tupDesc = tupleDescriptor;
/* drop into the router executor */ /* drop into the router executor */
RouterExecutorStart(queryDesc, eflags, task); RouterExecutorStart(queryDesc, eflags);
} }
else else
{ {

View File

@ -100,8 +100,8 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
TupleDesc tupleDescriptor); TupleDesc tupleDescriptor);
static List * TaskShardIntervalList(List *taskList); static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *shardIntervalList); static void AcquireExecutorMultiShardLocks(List *taskList);
static bool IsReplicated(List *shardIntervalList); static bool RequiresConsistentSnapshot(Task *task);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, DestReceiver *destination,
Tuplestorestate *tupleStore); Tuplestorestate *tupleStore);
@ -133,14 +133,11 @@ static void MarkRemainingInactivePlacements(void);
* execution. * execution.
*/ */
void void
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) RouterExecutorStart(QueryDesc *queryDesc, int eflags)
{ {
EState *executorState = NULL; EState *executorState = NULL;
CmdType commandType = queryDesc->operation; CmdType commandType = queryDesc->operation;
/* ensure that the task is not NULL */
Assert(task != NULL);
/* disallow triggers during distributed modify commands */ /* disallow triggers during distributed modify commands */
if (commandType != CMD_SELECT) if (commandType != CMD_SELECT)
{ {
@ -314,15 +311,38 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
{ {
LockShardResource(shardId, lockMode); LockShardResource(shardId, lockMode);
} }
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT.. SELECT
* commands from having a different effect on different placements.
*/
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockShardListResources(task->selectShardList, ExclusiveLock);
}
} }
/* /*
* AcquireExecutorMultiShardLocks acquires shard locks need for execution * AcquireExecutorMultiShardLocks acquires shard locks needed for execution
* of writes on multiple shards. * of writes on multiple shards. In addition to honouring commutativity
* rules, we currently only allow a single multi-shard command on a shard at
* a time. Otherwise, concurrent multi-shard commands may take row-level
* locks on the shard placements in a different order and create a distributed
* deadlock. This applies even when writes are commutative and/or there is
* no replication.
* *
* 1. If citus.all_modifications_commutative is set to true, then all locks * 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareUpdateExclusiveLock. * are acquired as ShareUpdateExclusiveLock.
*
* 2. If citus.all_modifications_commutative is false, then only the shards * 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock. * lock is acquired with ShareUpdateExclusiveLock.
@ -330,65 +350,121 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* ShareUpdateExclusiveLock conflicts with itself such that only one * ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts * multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied * with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with ShareLock, * in the same order on all placements. It does not conflict with
* which is normally obtained by single-shard commutative writes. * RowExclusiveLock, which is normally obtained by single-shard, commutative
* writes.
*/ */
static void static void
AcquireExecutorMultiShardLocks(List *shardIntervalList) AcquireExecutorMultiShardLocks(List *taskList)
{ {
LOCKMODE lockMode = NoLock; ListCell *taskCell = NULL;
if (AllModificationsCommutative || !IsReplicated(shardIntervalList)) foreach(taskCell, taskList)
{ {
Task *task = (Task *) lfirst(taskCell);
LOCKMODE lockMode = NoLock;
if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1)
{
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is no replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
LockShardResource(task->anchorShardId, lockMode);
/* /*
* When all writes are commutative then we only need to prevent multi-shard * If the task has a subselect, then we may need to lock the shards from which
* commands from running concurrently with each other and with commands * the query selects as well to prevent the subselects from seeing different
* that are explicitly non-commutative. When there is not replication then * results on different replicas. In particular this prevents INSERT..SELECT
* we only need to prevent concurrent multi-shard commands. * commands from having different effects on different placements.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*/ */
lockMode = ShareUpdateExclusiveLock; if (RequiresConsistentSnapshot(task))
} {
else /*
{ * ExclusiveLock conflicts with all lock types used by modifications
/* * and therefore prevents other modifications from running
* When there is replication, prevent all concurrent writes to the same * concurrently.
* shards to ensure the writes are ordered. */
*/
lockMode = ExclusiveLock;
}
LockShardListResources(shardIntervalList, lockMode); LockShardListResources(task->selectShardList, ExclusiveLock);
}
}
} }
/* /*
* IsReplicated checks whether any of the shards in the given list has more * RequiresConsistentSnapshot returns true if the given task need to take
* than one replica. * the necessary locks to ensure that a subquery in the INSERT ... SELECT
* query returns the same output for all task placements.
*/ */
static bool static bool
IsReplicated(List *shardIntervalList) RequiresConsistentSnapshot(Task *task)
{ {
ListCell *shardIntervalCell; bool requiresIsolation = false;
bool hasReplication = false;
foreach(shardIntervalCell, shardIntervalList) if (!task->insertSelectQuery)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); /*
uint64 shardId = shardInterval->shardId; * Only INSERT/SELECT commands currently require SELECT isolation.
List *shardPlacementList = FinalizedShardPlacementList(shardId); * Other commands do not read from other shards.
if (shardPlacementList->length > 1) */
{
hasReplication = true; requiresIsolation = false;
break; }
} else if (list_length(task->taskPlacementList) == 1)
{
/*
* If there is only one replica then we fully rely on PostgreSQL to
* provide SELECT isolation. In this case, we do not provide isolation
* across the shards, but that was never our intention.
*/
requiresIsolation = false;
}
else if (AllModificationsCommutative)
{
/*
* An INSERT/SELECT is commutative with other writes if it excludes
* any ongoing writes based on the filter conditions. Without knowing
* whether this is true, we assume the user took this into account
* when enabling citus.all_modifications_commutative. This option
* gives users an escape from aggressive locking during INSERT/SELECT.
*/
requiresIsolation = false;
}
else
{
/*
* If this is a non-commutative write, then we need to block ongoing
* writes to make sure that the subselect returns the same result
* on all placements.
*/
requiresIsolation = true;
} }
return hasReplication; return requiresIsolation;
} }
@ -812,7 +888,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
shardIntervalList = TaskShardIntervalList(taskList); shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */ /* ensure that there are no concurrent modifications on the same shards */
AcquireExecutorMultiShardLocks(shardIntervalList); AcquireExecutorMultiShardLocks(taskList);
/* open connection to all relevant placements, if not already open */ /* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName); OpenTransactionsToAllShardPlacements(shardIntervalList, userName);

View File

@ -49,7 +49,7 @@ JobExecutorType(MultiPlan *multiPlan)
double tasksPerNode = taskCount / ((double) workerNodeCount); double tasksPerNode = taskCount / ((double) workerNodeCount);
int dependedJobCount = list_length(job->dependedJobList); int dependedJobCount = list_length(job->dependedJobList);
MultiExecutorType executorType = TaskExecutorType; MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = RouterExecutablePlan(multiPlan, executorType); bool routerExecutablePlan = multiPlan->routerExecutable;
/* check if can switch to router executor */ /* check if can switch to router executor */
if (routerExecutablePlan) if (routerExecutablePlan)
@ -109,78 +109,6 @@ JobExecutorType(MultiPlan *multiPlan)
} }
/*
* RouterExecutablePlan returns whether a multi-plan can be executed using the
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{
Job *job = multiPlan->workerJob;
TaskType taskType = TASK_TYPE_INVALID_FIRST;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
Task *workerTask = NULL;
List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false;
/* router executor cannot execute queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* check if the first task is a modify or a router task, short-circuit if so */
workerTask = (Task *) linitial(workerTaskList);
taskType = workerTask->taskType;
if (taskType == MODIFY_TASK || taskType == ROUTER_TASK)
{
return true;
}
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/* router executor cannot execute queries with dependent data fetch tasks */
workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0)
{
return false;
}
/* router executor cannot execute queries with order by */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/*
* Router executor cannot execute queries with aggregates.
* Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
return true;
}
/* /*
* MaxMasterConnectionCount returns the number of connections a master can open. * MaxMasterConnectionCount returns the number of connections a master can open.
* A master cannot create more than a certain number of file descriptors (FDs). * A master cannot create more than a certain number of file descriptors (FDs).

View File

@ -228,7 +228,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
es->indent += 1; es->indent += 1;
} }
routerExecutablePlan = RouterExecutablePlan(multiPlan, TaskExecutorType); routerExecutablePlan = multiPlan->routerExecutable;
if (routerExecutablePlan) if (routerExecutablePlan)
{ {

View File

@ -152,7 +152,6 @@ static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHa
static void ErrorIfUnsupportedTableCombination(Query *queryTree); static void ErrorIfUnsupportedTableCombination(Query *queryTree);
static void ErrorIfUnsupportedUnionQuery(Query *unionQuery); static void ErrorIfUnsupportedUnionQuery(Query *unionQuery);
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
static bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList); static bool FullCompositeFieldList(List *compositeFieldList);
static Query * LateralQuery(Query *query); static Query * LateralQuery(Query *query);
@ -3318,7 +3317,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
* Note that if the given expression is a field of a composite type, then this * Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column. * function checks if this composite column is a partition column.
*/ */
static bool bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query) IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
{ {
bool isPartitionColumn = false; bool isPartitionColumn = false;
@ -3863,7 +3862,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
/* /*
* RelationIdList returns list of unique relation ids in query tree. * RelationIdList returns list of unique relation ids in query tree.
*/ */
List * static List *
RelationIdList(Query *query) RelationIdList(Query *query)
{ {
List *rangeTableList = NIL; List *rangeTableList = NIL;

View File

@ -36,6 +36,7 @@
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
@ -115,7 +116,8 @@ static uint32 HashPartitionCount(void);
static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
uint32 shardIntervalCount); uint32 shardIntervalCount);
/* Local functions forward declarations for task list creation */ /* Local functions forward declarations for task list creation and helper functions */
static bool MultiPlanRouterExecutable(MultiPlan *multiPlan);
static Job * BuildJobTreeTaskList(Job *jobTree); static Job * BuildJobTreeTaskList(Job *jobTree);
static List * SubquerySqlTaskList(Job *job); static List * SubquerySqlTaskList(Job *job);
static List * SqlTaskList(Job *job); static List * SqlTaskList(Job *job);
@ -130,8 +132,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber); int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn); static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static Var * MakeInt4Column(void);
static Const * MakeInt4Constant(Datum constantValue);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression); static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
static List * BuildRestrictInfoList(List *qualList); static List * BuildRestrictInfoList(List *qualList);
static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
@ -150,8 +150,6 @@ static List * DataFetchTaskList(uint64 jobId, uint32 taskIdIndex, List *fragment
static StringInfo NodeNameArrayString(List *workerNodeList); static StringInfo NodeNameArrayString(List *workerNodeList);
static StringInfo NodePortArrayString(List *workerNodeList); static StringInfo NodePortArrayString(List *workerNodeList);
static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId); static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId);
static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList); static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry, static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
RangeTableFragment *fragment); RangeTableFragment *fragment);
@ -222,11 +220,73 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree)
multiPlan->workerJob = workerJob; multiPlan->workerJob = workerJob;
multiPlan->masterQuery = masterQuery; multiPlan->masterQuery = masterQuery;
multiPlan->masterTableName = jobSchemaName->data; multiPlan->masterTableName = jobSchemaName->data;
multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan);
return multiPlan; return multiPlan;
} }
/*
* MultiPlanRouterExecutable returns true if the input multiPlan is
* router executable.
*
* Note that all the multi plans that are created by router planner are
* already router executable. Thus, this function should only be called
* for multi plans that are not generated by router planner.
*/
static bool
MultiPlanRouterExecutable(MultiPlan *multiPlan)
{
Query *masterQuery = multiPlan->masterQuery;
Job *job = multiPlan->workerJob;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
bool masterQueryHasAggregates = false;
/* router executor cannot execute SELECT queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/*
* Router executor does not run master query. This means that aggregation and
* sorting on the master query wouldn't be executed. Thus, such plans shouldn't be
* qualified as router executable.
*/
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/*
* Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
/* FIXME: I tend to think it's time to remove this */
if (TaskExecutorType != MULTI_EXECUTOR_REAL_TIME)
{
return false;
}
return true;
}
/* /*
* BuildJobTree builds the physical job tree from the given logical plan tree. * BuildJobTree builds the physical job tree from the given logical plan tree.
* The function walks over the logical plan from the bottom up, finds boundaries * The function walks over the logical plan from the bottom up, finds boundaries
@ -3017,7 +3077,7 @@ MakeHashedOperatorExpression(OpExpr *operatorExpression)
* MakeInt4Column creates a column of int4 type with invalid table id and max * MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number. * attribute number.
*/ */
static Var * Var *
MakeInt4Column() MakeInt4Column()
{ {
Index tableId = 0; Index tableId = 0;
@ -3037,7 +3097,7 @@ MakeInt4Column()
* MakeInt4Constant creates a new constant of int4 type and assigns the given * MakeInt4Constant creates a new constant of int4 type and assigns the given
* value as a constant value. * value as a constant value.
*/ */
static Const * Const *
MakeInt4Constant(Datum constantValue) MakeInt4Constant(Datum constantValue)
{ {
Oid constantType = INT4OID; Oid constantType = INT4OID;
@ -3939,7 +3999,7 @@ DatumArrayString(Datum *datumArray, uint32 datumCount, Oid datumTypeId)
* CreateBasicTask creates a task, initializes fields that are common to each task, * CreateBasicTask creates a task, initializes fields that are common to each task,
* and returns the created task. * and returns the created task.
*/ */
static Task * Task *
CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString) CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString)
{ {
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);

View File

@ -59,6 +59,27 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
originalQuery = copyObject(parse); originalQuery = copyObject(parse);
/*
* We implement INSERT INTO .. SELECT by pushing down the SELECT to
* each shard. To compute that we use the router planner, by adding
* an "uninstantiated" constraint that the partition column be equal to a
* certain value. standard_planner() distributes that constraint to
* the baserestrictinfos to all the tables where it knows how to push
* the restriction safely. An example is that the tables that are
* connected via equi joins.
*
* The router planner then iterates over the target table's shards,
* for each we replace the "uninstantiated" restriction, with one that
* PruneShardList() handles, and then generate a query for that
* individual shard. If any of the involved tables don't prune down
* to a single shard, or if the pruned shards aren't colocated,
* we error out.
*/
if (InsertSelectQuery(parse))
{
AddUninstantiatedPartitionRestriction(parse);
}
} }
/* create a restriction context and put it at the end if context list */ /* create a restriction context and put it at the end if context list */

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,79 @@
/*-------------------------------------------------------------------------
*
* test/src/depase_shard_query.c
*
* This file contains functions to exercise deparsing of INSERT .. SELECT queries
* for distributed tables.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include <stddef.h>
#include "catalog/pg_type.h"
#include "distributed/master_protocol.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/value.h"
#include "tcop/tcopprot.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/palloc.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(deparse_shard_query_test);
Datum
deparse_shard_query_test(PG_FUNCTION_ARGS)
{
text *queryString = PG_GETARG_TEXT_P(0);
char *queryStringChar = text_to_cstring(queryString);
List *parseTreeList = pg_parse_query(queryStringChar);
ListCell *parseTreeCell = NULL;
foreach(parseTreeCell, parseTreeList)
{
Node *parsetree = (Node *) lfirst(parseTreeCell);
ListCell *queryTreeCell = NULL;
List *queryTreeList = pg_analyze_and_rewrite(parsetree, queryStringChar,
NULL, 0);
foreach(queryTreeCell, queryTreeList)
{
Query *query = lfirst(queryTreeCell);
StringInfo buffer = makeStringInfo();
/* reoreder the target list only for INSERT .. SELECT queries */
if (InsertSelectQuery(query))
{
RangeTblEntry *insertRte = linitial(query->rtable);
RangeTblEntry *subqueryRte = lsecond(query->rtable);
ReorderInsertSelectTargetLists(query, insertRte, subqueryRte);
}
deparse_shard_query(query, InvalidOid, 0, buffer);
elog(INFO, "query: %s", buffer->data);
}
}
PG_RETURN_VOID();
}

View File

@ -277,6 +277,7 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_STRING_FIELD(masterTableName); WRITE_STRING_FIELD(masterTableName);
WRITE_BOOL_FIELD(routerExecutable);
} }
@ -493,6 +494,8 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained); WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution); WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery); WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(selectShardList);
} }
#if (PG_VERSION_NUM < 90600) #if (PG_VERSION_NUM < 90600)

View File

@ -184,6 +184,7 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob); READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName); READ_STRING_FIELD(masterTableName);
READ_BOOL_FIELD(routerExecutable);
READ_DONE(); READ_DONE();
} }
@ -289,6 +290,8 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery); READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(selectShardList);
READ_DONE(); READ_DONE();
} }

View File

@ -122,6 +122,7 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool LeafQuery(Query *queryTree); extern bool LeafQuery(Query *queryTree);
extern List * PartitionColumnOpExpressionList(Query *query); extern List * PartitionColumnOpExpressionList(Query *query);
extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn); extern List * ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn);
extern bool IsPartitionColumnRecursive(Expr *columnExpression, Query *query);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */ #endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -169,6 +169,9 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */ uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */ bool upsertQuery; /* only applies to modify tasks */
bool insertSelectQuery;
List *selectShardList; /* only applies INSERT/SELECT tasks */
} Task; } Task;
@ -205,6 +208,7 @@ typedef struct MultiPlan
Job *workerJob; Job *workerJob;
Query *masterQuery; Query *masterQuery;
char *masterTableName; char *masterTableName;
bool routerExecutable;
} MultiPlan; } MultiPlan;
@ -227,6 +231,8 @@ extern int TaskAssignmentPolicy;
/* Function declarations for building physical plans and constructing queries */ /* Function declarations for building physical plans and constructing queries */
extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree); extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree);
extern StringInfo ShardFetchQueryString(uint64 shardId); extern StringInfo ShardFetchQueryString(uint64 shardId);
extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString);
/* Function declarations for shard pruning */ /* Function declarations for shard pruning */
extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList, extern List * PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
@ -243,9 +249,10 @@ extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval)
extern bool SimpleOpExpression(Expr *clause); extern bool SimpleOpExpression(Expr *clause);
extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn); extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn);
/* helper functions */
extern Var * MakeInt4Column(void);
extern Const * MakeInt4Constant(Datum constantValue);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement); extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
/* Function declarations for sorting shards. */
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval); ShardInterval *secondInterval);

View File

@ -33,7 +33,7 @@ typedef struct XactShardConnSet
extern bool AllModificationsCommutative; extern bool AllModificationsCommutative;
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);

View File

@ -21,13 +21,21 @@
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
/* reserved parameted id, we chose a negative number since it is not assigned by postgres */
#define UNINSTANTIATED_PARAMETER_ID INT_MIN
/* reserved alias name for UPSERTs */ /* reserved alias name for UPSERTs */
#define UPSERT_ALIAS "citus_table_alias" #define CITUS_TABLE_ALIAS "citus_table_alias"
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType, MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
extern void AddUninstantiatedPartitionRestriction(Query *originalQuery);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree); extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte);
extern bool InsertSelectQuery(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -191,7 +191,6 @@ extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job); extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */ /* Function declarations common to more than one executor */
extern bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan); extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan);
extern void RemoveJobDirectory(uint64 jobId); extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus); extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);

View File

@ -0,0 +1,428 @@
--
-- MULTI_DEPARSE_SHARD_QUERY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000;
CREATE FUNCTION deparse_shard_query_test(text)
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT;
-- create the first table
CREATE TABLE raw_events_1
(tenant_id bigint,
value_1 int,
value_2 int,
value_3 float,
value_4 bigint,
value_5 text,
value_6 int DEfAULT 10,
value_7 int,
event_at date DEfAULT now()
);
SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('raw_events_1', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- create the first table
CREATE TABLE raw_events_2
(tenant_id bigint,
value_1 int,
value_2 int,
value_3 float,
value_4 bigint,
value_5 text,
value_6 float DEfAULT (random()*100)::float,
value_7 int,
event_at date DEfAULT now()
);
SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('raw_events_2', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE aggregated_events
(tenant_id bigint,
sum_value_1 bigint,
average_value_2 float,
average_value_3 float,
sum_value_4 bigint,
sum_value_5 float,
average_value_6 int,
rollup_hour date);
SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('aggregated_events', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- start with very simple examples on a single table
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1
SELECT * FROM raw_events_1;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, value_5, value_6, value_7, event_at FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_4)
SELECT
tenant_id, value_4
FROM
raw_events_1;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, event_at) SELECT tenant_id, value_4, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)
-- now that shuffle columns a bit on a single table
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
SELECT
value_2::text, value_5::int, tenant_id, value_4
FROM
raw_events_1;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)
-- same test on two different tables
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
SELECT
value_2::text, value_5::int, tenant_id, value_4
FROM
raw_events_2;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_2, value_4, value_5, value_6, event_at) SELECT tenant_id, (value_5)::integer AS value_5, value_4, (value_2)::text AS value_2, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_2
deparse_shard_query_test
--------------------------
(1 row)
-- lets do some simple aggregations
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4)
SELECT
tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4)
FROM
raw_events_1
GROUP BY
tenant_id, date_trunc(\'hour\', event_at)
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, average_value_3, sum_value_4, average_value_6, rollup_hour) SELECT tenant_id, sum(value_1) AS sum, avg(value_3) AS avg, sum(value_4) AS sum, avg(value_6) AS avg, date_trunc('hour'::text, (event_at)::timestamp with time zone) AS date_trunc FROM public.raw_events_1 GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone))
deparse_shard_query_test
--------------------------
(1 row)
-- also some subqueries, JOINS with a complicated target lists
-- a simple JOIN
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1 (value_3, tenant_id)
SELECT
raw_events_2.value_3, raw_events_1.tenant_id
FROM
raw_events_1, raw_events_2
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT raw_events_1.tenant_id, raw_events_2.value_3, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id)
deparse_shard_query_test
--------------------------
(1 row)
-- join with group by
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1 (value_3, tenant_id)
SELECT
max(raw_events_2.value_3), avg(raw_events_1.value_3)
FROM
raw_events_1, raw_events_2
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_3, value_6, event_at) SELECT avg(raw_events_1.value_3) AS avg, max(raw_events_2.value_3) AS max, 10 AS value_6, (now())::date AS event_at FROM public.raw_events_1, public.raw_events_2 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id) GROUP BY raw_events_1.event_at
deparse_shard_query_test
--------------------------
(1 row)
-- a more complicated JOIN
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_4, tenant_id)
SELECT
max(r1.value_4), r3.tenant_id
FROM
raw_events_1 r1, raw_events_2 r2, raw_events_1 r3
WHERE
r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id
GROUP BY
r1.value_1, r3.tenant_id, r2.event_at
ORDER BY
r2.event_at DESC;
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4) SELECT r3.tenant_id, max(r1.value_4) AS max FROM public.raw_events_1 r1, public.raw_events_2 r2, public.raw_events_1 r3 WHERE ((r1.tenant_id = r2.tenant_id) AND (r2.tenant_id = r3.tenant_id)) GROUP BY r1.value_1, r3.tenant_id, r2.event_at ORDER BY r2.event_at DESC
deparse_shard_query_test
--------------------------
(1 row)
-- queries with CTEs are supported
SELECT deparse_shard_query_test('
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id)
SELECT
event_at, sum(value_5::int), tenant_id
FROM
raw_events_1
GROUP BY
event_at, tenant_id;
');
INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5, rollup_hour) SELECT tenant_id, sum((value_5)::integer) AS sum, event_at FROM public.raw_events_1 GROUP BY event_at, tenant_id
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
INSERT INTO aggregated_events (sum_value_5, tenant_id)
SELECT
sum(value_5::int), tenant_id
FROM
raw_events_1
GROUP BY
event_at, tenant_id;
');
INFO: query: WITH first_tenant AS (SELECT raw_events_1.event_at, raw_events_1.value_5, raw_events_1.tenant_id FROM public.raw_events_1) INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, sum((value_5)::integer) AS sum FROM public.raw_events_1 GROUP BY event_at, tenant_id
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id)
WITH RECURSIVE hierarchy as (
SELECT value_1, 1 AS LEVEL, tenant_id
FROM raw_events_1
WHERE tenant_id = 1
UNION
SELECT re.value_2, (h.level+1), re.tenant_id
FROM hierarchy h JOIN raw_events_1 re
ON (h.tenant_id = re.tenant_id AND
h.value_1 = re.value_6))
SELECT * FROM hierarchy WHERE LEVEL <= 2;
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) WITH RECURSIVE hierarchy AS (SELECT raw_events_1.value_1, 1 AS level, raw_events_1.tenant_id FROM public.raw_events_1 WHERE (raw_events_1.tenant_id = 1) UNION SELECT re.value_2, (h.level + 1), re.tenant_id FROM (hierarchy h JOIN public.raw_events_1 re ON (((h.tenant_id = re.tenant_id) AND (h.value_1 = re.value_6))))) SELECT tenant_id, value_1, level FROM hierarchy WHERE (level <= 2)
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1)
SELECT
DISTINCT value_1
FROM
raw_events_1;
');
INFO: query: INSERT INTO public.aggregated_events (sum_value_1) SELECT DISTINCT value_1 FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)
-- many filters suffled
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id)
SELECT value_3, value_2, tenant_id
FROM raw_events_1
WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000);
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, value_2, value_3 FROM public.raw_events_1 WHERE (((value_5 ~~ '%s'::text) OR (value_5 ~~ '%a'::text)) AND (tenant_id = 1) AND ((value_6 < 3000) OR (value_3 > (8000)::double precision)))
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, tenant_id)
SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id
FROM raw_events_1
WHERE event_at = now();
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_5) SELECT tenant_id, rank() OVER (PARTITION BY tenant_id ORDER BY value_6) AS rank FROM public.raw_events_1 WHERE (event_at = now())
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4)
SELECT random(), int4eq(1, max(value_1))::int, value_6
FROM raw_events_1
WHERE event_at = now()
GROUP BY event_at, value_7, value_6;
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_4, sum_value_5) SELECT (int4eq(1, max(value_1)))::integer AS int4eq, value_6, random() AS random FROM public.raw_events_1 WHERE (event_at = now()) GROUP BY event_at, value_7, value_6
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1, tenant_id)
SELECT
count(DISTINCT CASE
WHEN
value_1 > 100
THEN
tenant_id
ELSE
value_6
END) as c,
max(tenant_id)
FROM
raw_events_1;
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1) SELECT max(tenant_id) AS max, count(DISTINCT CASE WHEN (value_1 > 100) THEN tenant_id ELSE (value_6)::bigint END) AS c FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_7, value_1, tenant_id)
SELECT
value_7, value_1, tenant_id
FROM
(SELECT
tenant_id, value_2 as value_7, value_1
FROM
raw_events_2
) as foo
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_1, value_6, value_7, event_at) SELECT tenant_id, value_1, 10 AS value_6, value_7, (now())::date AS event_at FROM (SELECT raw_events_2.tenant_id, raw_events_2.value_2 AS value_7, raw_events_2.value_1 FROM public.raw_events_2) foo
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5)
SELECT
sum(value_1), tenant_id, sum(value_5::bigint)
FROM
(SELECT
raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1
FROM
raw_events_2, raw_events_1
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id
) as foo
GROUP BY
tenant_id, date_trunc(\'hour\', event_at)
');
INFO: query: INSERT INTO public.aggregated_events (tenant_id, sum_value_1, sum_value_5) SELECT tenant_id, sum(value_1) AS sum, sum((value_5)::bigint) AS sum FROM (SELECT raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1 FROM public.raw_events_2, public.raw_events_1 WHERE (raw_events_1.tenant_id = raw_events_2.tenant_id)) foo GROUP BY tenant_id, (date_trunc('hour'::text, (event_at)::timestamp with time zone))
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test(E'
INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4)
SELECT
tenant_id, value_1, value_2, value_3, value_4
FROM
(SELECT
value_2, value_4, tenant_id, value_1, value_3
FROM
raw_events_1
) as foo
');
INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT tenant_id, value_1, value_2, value_3, value_4, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo
deparse_shard_query_test
--------------------------
(1 row)
SELECT deparse_shard_query_test(E'
INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3)
SELECT
*
FROM
(SELECT
value_2, value_4, tenant_id, value_1, value_3
FROM
raw_events_1
) as foo
');
INFO: query: INSERT INTO public.raw_events_2 (tenant_id, value_1, value_2, value_3, value_4, value_6, event_at) SELECT value_2, value_4, value_1, value_3, tenant_id, (random() * (100)::double precision) AS value_6, (now())::date AS event_at FROM (SELECT raw_events_1.value_2, raw_events_1.value_4, raw_events_1.tenant_id, raw_events_1.value_1, raw_events_1.value_3 FROM public.raw_events_1) foo
deparse_shard_query_test
--------------------------
(1 row)
-- use a column multiple times
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
SELECT
tenant_id, value_7, value_7
FROM
raw_events_1
ORDER BY
value_2, value_1;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_7, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1 ORDER BY value_2, value_1
deparse_shard_query_test
--------------------------
(1 row)
-- test dropped table as well
ALTER TABLE raw_events_1 DROP COLUMN value_5;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
SELECT
tenant_id, value_7, value_4
FROM
raw_events_1;
');
INFO: query: INSERT INTO public.raw_events_1 (tenant_id, value_4, value_6, value_7, event_at) SELECT tenant_id, value_4, 10 AS value_6, value_7, (now())::date AS event_at FROM public.raw_events_1
deparse_shard_query_test
--------------------------
(1 row)

File diff suppressed because it is too large Load Diff

View File

@ -205,10 +205,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
ERROR: cannot perform distributed planning for the given modification ERROR: cannot perform distributed planning for the given modification
DETAIL: Multi-row INSERTs to distributed tables are not supported. DETAIL: Multi-row INSERTs to distributed tables are not supported.
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported -- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders; -- INSERT INTO limit_orders SELECT * FROM limit_orders;
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES; INSERT INTO limit_orders DEFAULT VALUES;

View File

@ -29,9 +29,12 @@ test: multi_create_table_constraints
test: multi_master_protocol test: multi_master_protocol
test: multi_load_data test: multi_load_data
test: multi_insert_select
# ---------- # ----------
# Miscellaneous tests to check our query planning behavior # Miscellaneous tests to check our query planning behavior
# ---------- # ----------
test: multi_deparse_shard_query
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
test: multi_explain test: multi_explain
test: multi_subquery test: multi_subquery

View File

@ -0,0 +1,304 @@
--
-- MULTI_DEPARSE_SHARD_QUERY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13100000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13100000;
CREATE FUNCTION deparse_shard_query_test(text)
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT;
-- create the first table
CREATE TABLE raw_events_1
(tenant_id bigint,
value_1 int,
value_2 int,
value_3 float,
value_4 bigint,
value_5 text,
value_6 int DEfAULT 10,
value_7 int,
event_at date DEfAULT now()
);
SELECT master_create_distributed_table('raw_events_1', 'tenant_id', 'hash');
SELECT master_create_worker_shards('raw_events_1', 4, 1);
-- create the first table
CREATE TABLE raw_events_2
(tenant_id bigint,
value_1 int,
value_2 int,
value_3 float,
value_4 bigint,
value_5 text,
value_6 float DEfAULT (random()*100)::float,
value_7 int,
event_at date DEfAULT now()
);
SELECT master_create_distributed_table('raw_events_2', 'tenant_id', 'hash');
SELECT master_create_worker_shards('raw_events_2', 4, 1);
CREATE TABLE aggregated_events
(tenant_id bigint,
sum_value_1 bigint,
average_value_2 float,
average_value_3 float,
sum_value_4 bigint,
sum_value_5 float,
average_value_6 int,
rollup_hour date);
SELECT master_create_distributed_table('aggregated_events', 'tenant_id', 'hash');
SELECT master_create_worker_shards('aggregated_events', 4, 1);
-- start with very simple examples on a single table
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1
SELECT * FROM raw_events_1;
');
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_4)
SELECT
tenant_id, value_4
FROM
raw_events_1;
');
-- now that shuffle columns a bit on a single table
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
SELECT
value_2::text, value_5::int, tenant_id, value_4
FROM
raw_events_1;
');
-- same test on two different tables
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_5, value_2, tenant_id, value_4)
SELECT
value_2::text, value_5::int, tenant_id, value_4
FROM
raw_events_2;
');
-- lets do some simple aggregations
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (tenant_id, rollup_hour, sum_value_1, average_value_3, average_value_6, sum_value_4)
SELECT
tenant_id, date_trunc(\'hour\', event_at) , sum(value_1), avg(value_3), avg(value_6), sum(value_4)
FROM
raw_events_1
GROUP BY
tenant_id, date_trunc(\'hour\', event_at)
');
-- also some subqueries, JOINS with a complicated target lists
-- a simple JOIN
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1 (value_3, tenant_id)
SELECT
raw_events_2.value_3, raw_events_1.tenant_id
FROM
raw_events_1, raw_events_2
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id;
');
-- join with group by
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1 (value_3, tenant_id)
SELECT
max(raw_events_2.value_3), avg(raw_events_1.value_3)
FROM
raw_events_1, raw_events_2
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id GROUP BY raw_events_1.event_at
');
-- a more complicated JOIN
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_4, tenant_id)
SELECT
max(r1.value_4), r3.tenant_id
FROM
raw_events_1 r1, raw_events_2 r2, raw_events_1 r3
WHERE
r1.tenant_id = r2.tenant_id AND r2.tenant_id = r3.tenant_id
GROUP BY
r1.value_1, r3.tenant_id, r2.event_at
ORDER BY
r2.event_at DESC;
');
-- queries with CTEs are supported
SELECT deparse_shard_query_test('
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
INSERT INTO aggregated_events (rollup_hour, sum_value_5, tenant_id)
SELECT
event_at, sum(value_5::int), tenant_id
FROM
raw_events_1
GROUP BY
event_at, tenant_id;
');
SELECT deparse_shard_query_test('
WITH first_tenant AS (SELECT event_at, value_5, tenant_id FROM raw_events_1)
INSERT INTO aggregated_events (sum_value_5, tenant_id)
SELECT
sum(value_5::int), tenant_id
FROM
raw_events_1
GROUP BY
event_at, tenant_id;
');
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1, sum_value_5, tenant_id)
WITH RECURSIVE hierarchy as (
SELECT value_1, 1 AS LEVEL, tenant_id
FROM raw_events_1
WHERE tenant_id = 1
UNION
SELECT re.value_2, (h.level+1), re.tenant_id
FROM hierarchy h JOIN raw_events_1 re
ON (h.tenant_id = re.tenant_id AND
h.value_1 = re.value_6))
SELECT * FROM hierarchy WHERE LEVEL <= 2;
');
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1)
SELECT
DISTINCT value_1
FROM
raw_events_1;
');
-- many filters suffled
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, sum_value_1, tenant_id)
SELECT value_3, value_2, tenant_id
FROM raw_events_1
WHERE (value_5 like \'%s\' or value_5 like \'%a\') and (tenant_id = 1) and (value_6 < 3000 or value_3 > 8000);
');
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, tenant_id)
SELECT rank() OVER (PARTITION BY tenant_id ORDER BY value_6), tenant_id
FROM raw_events_1
WHERE event_at = now();
');
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events (sum_value_5, tenant_id, sum_value_4)
SELECT random(), int4eq(1, max(value_1))::int, value_6
FROM raw_events_1
WHERE event_at = now()
GROUP BY event_at, value_7, value_6;
');
SELECT deparse_shard_query_test('
INSERT INTO aggregated_events (sum_value_1, tenant_id)
SELECT
count(DISTINCT CASE
WHEN
value_1 > 100
THEN
tenant_id
ELSE
value_6
END) as c,
max(tenant_id)
FROM
raw_events_1;
');
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(value_7, value_1, tenant_id)
SELECT
value_7, value_1, tenant_id
FROM
(SELECT
tenant_id, value_2 as value_7, value_1
FROM
raw_events_2
) as foo
');
SELECT deparse_shard_query_test(E'
INSERT INTO aggregated_events(sum_value_1, tenant_id, sum_value_5)
SELECT
sum(value_1), tenant_id, sum(value_5::bigint)
FROM
(SELECT
raw_events_1.event_at, raw_events_2.tenant_id, raw_events_2.value_5, raw_events_1.value_1
FROM
raw_events_2, raw_events_1
WHERE
raw_events_1.tenant_id = raw_events_2.tenant_id
) as foo
GROUP BY
tenant_id, date_trunc(\'hour\', event_at)
');
SELECT deparse_shard_query_test(E'
INSERT INTO raw_events_2(tenant_id, value_1, value_2, value_3, value_4)
SELECT
tenant_id, value_1, value_2, value_3, value_4
FROM
(SELECT
value_2, value_4, tenant_id, value_1, value_3
FROM
raw_events_1
) as foo
');
SELECT deparse_shard_query_test(E'
INSERT INTO raw_events_2(tenant_id, value_1, value_4, value_2, value_3)
SELECT
*
FROM
(SELECT
value_2, value_4, tenant_id, value_1, value_3
FROM
raw_events_1
) as foo
');
-- use a column multiple times
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
SELECT
tenant_id, value_7, value_7
FROM
raw_events_1
ORDER BY
value_2, value_1;
');
-- test dropped table as well
ALTER TABLE raw_events_1 DROP COLUMN value_5;
SELECT deparse_shard_query_test('
INSERT INTO raw_events_1(tenant_id, value_7, value_4)
SELECT
tenant_id, value_7, value_4
FROM
raw_events_1;
');

View File

@ -0,0 +1,608 @@
--
-- MULTI_INSERT_SELECT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000;
-- create co-located tables
SET citus.shard_count = 4;
SET citus.shard_replication_factor = 2;
CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
SELECT create_distributed_table('raw_events_first', 'user_id');
CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
SELECT create_distributed_table('raw_events_second', 'user_id');
CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg));
SELECT create_distributed_table('agg_events', 'user_id');;
-- create the reference table as well
CREATE TABLE reference_table (user_id int);
SELECT create_reference_table('reference_table');
-- set back to the defaults
SET citus.shard_count = DEFAULT;
SET citus.shard_replication_factor = DEFAULT;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(1, now(), 10, 100, 1000.1, 10000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(2, now(), 20, 200, 2000.1, 20000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(3, now(), 30, 300, 3000.1, 30000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(4, now(), 40, 400, 4000.1, 40000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(5, now(), 50, 500, 5000.1, 50000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(6, now(), 60, 600, 6000.1, 60000);
SET client_min_messages TO DEBUG4;
-- raw table to raw table
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
-- see that our first multi shard INSERT...SELECT works expected
SET client_min_messages TO INFO;
SELECT
raw_events_first.user_id
FROM
raw_events_first, raw_events_second
WHERE
raw_events_first.user_id = raw_events_second.user_id;
-- see that we get unique vialitons
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
-- add one more row
INSERT INTO raw_events_first (user_id, time) VALUES
(7, now());
-- try a single shard query
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7;
SET client_min_messages TO INFO;
-- add one more row
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(8, now(), 80, 800, 8000, 80000);
-- reorder columns
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
user_id = 8;
-- a zero shard select
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
false;
-- another zero shard select
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
SELECT
value_2, value_1, value_3, value_4, user_id, time
FROM
raw_events_first
WHERE
0 != 0;
-- add one more row
SET client_min_messages TO INFO;
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(9, now(), 90, 900, 9000, 90000);
-- show that RETURNING also works
SET client_min_messages TO DEBUG4;
INSERT INTO raw_events_second (user_id, value_1, value_3)
SELECT
user_id, value_1, value_3
FROM
raw_events_first
WHERE
value_3 = 9000
RETURNING *;
-- hits two shards
INSERT INTO raw_events_second (user_id, value_1, value_3)
SELECT
user_id, value_1, value_3
FROM
raw_events_first
WHERE
user_id = 9 OR user_id = 16
RETURNING *;
-- now do some aggregations
INSERT INTO agg_events
SELECT
user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4)
FROM
raw_events_first
GROUP BY
user_id;
-- group by column not exists on the SELECT target list
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
SELECT
sum(value_3), count(value_4), sum(value_1), user_id
FROM
raw_events_first
GROUP BY
value_2, user_id
RETURNING *;
-- some subquery tests
INSERT INTO agg_events
(value_1_agg,
user_id)
SELECT SUM(value_1),
id
FROM (SELECT raw_events_second.user_id AS id,
raw_events_second.value_1
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo
GROUP BY id;
-- subquery one more level depth
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo;
-- join between subqueries
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- add one more level subqueris on top of subquery JOINs
INSERT INTO agg_events
(user_id, value_4_agg)
SELECT
outer_most.id, max(outer_most.value)
FROM
(
SELECT f2.id as id, f2.v4 as value FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id)) as outer_most
GROUP BY
outer_most.id;
-- subqueries in WHERE clause
INSERT INTO raw_events_second
(user_id)
SELECT user_id
FROM raw_events_first
WHERE user_id IN (SELECT user_id
FROM raw_events_second
WHERE user_id = 2);
-- some UPSERTS
INSERT INTO agg_events AS ae
(
user_id,
value_1_agg,
agg_time
)
SELECT user_id,
value_1,
time
FROM raw_events_first
ON conflict (user_id, value_1_agg)
DO UPDATE
SET agg_time = EXCLUDED.agg_time
WHERE ae.agg_time < EXCLUDED.agg_time;
-- upserts with returning
INSERT INTO agg_events AS ae
(
user_id,
value_1_agg,
agg_time
)
SELECT user_id,
value_1,
time
FROM raw_events_first
ON conflict (user_id, value_1_agg)
DO UPDATE
SET agg_time = EXCLUDED.agg_time
WHERE ae.agg_time < EXCLUDED.agg_time
RETURNING user_id, value_1_agg;
INSERT INTO agg_events (user_id, value_1_agg)
SELECT
user_id, sum(value_1 + value_2)
FROM
raw_events_first GROUP BY user_id;
-- FILTER CLAUSE
INSERT INTO agg_events (user_id, value_1_agg)
SELECT
user_id, sum(value_1 + value_2) FILTER (where value_3 = 15)
FROM
raw_events_first GROUP BY user_id;
-- a test with reference table JOINs
INSERT INTO
agg_events (user_id, value_1_agg)
SELECT
raw_events_first.user_id, sum(value_1)
FROM
reference_table, raw_events_first
WHERE
raw_events_first.user_id = reference_table.user_id
GROUP BY
raw_events_first.user_id;
-- a note on the outer joins is that
-- we filter out outer join results
-- where partition column returns
-- NULL. Thus, we could INSERT less rows
-- than we expect from subquery result.
-- see the following tests
SET client_min_messages TO INFO;
-- we don't want to see constraint vialotions, so truncate first
TRUNCATE agg_events;
-- add a row to first table to make table contents different
INSERT INTO raw_events_second (user_id, time, value_1, value_2, value_3, value_4) VALUES
(10, now(), 100, 10000, 10000, 100000);
DELETE FROM raw_events_second WHERE user_id = 2;
-- we select 11 rows
SELECT t1.user_id AS col1,
t2.user_id AS col2
FROM raw_events_first t1
FULL JOIN raw_events_second t2
ON t1.user_id = t2.user_id
ORDER BY t1.user_id,
t2.user_id;
SET client_min_messages TO DEBUG4;
-- we insert 10 rows since we filtered out
-- NULL partition column values
INSERT INTO agg_events (user_id, value_1_agg)
SELECT t1.user_id AS col1,
t2.user_id AS col2
FROM raw_events_first t1
FULL JOIN raw_events_second t2
ON t1.user_id = t2.user_id;
SET client_min_messages TO INFO;
-- see that the results are different from the SELECT query
SELECT
user_id, value_1_agg
FROM
agg_events
ORDER BY
user_id, value_1_agg;
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
TRUNCATE agg_events;
SET client_min_messages TO DEBUG4;
-- DISTINCT clause
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT value_1, user_id
FROM
raw_events_first;
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG4;
-- we do not support DISTINCT ON clauses
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (value_1) value_1, user_id
FROM
raw_events_first;
-- We do not support some CTEs
WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)
INSERT INTO agg_events
(value_1_agg, user_id)
SELECT
v1_agg, user_id
FROM
fist_table_agg;
-- We do support some CTEs
INSERT INTO agg_events
WITH sub_cte AS (SELECT 1)
SELECT
raw_events_first.user_id, (SELECT * FROM sub_cte)
FROM
raw_events_first;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) INTERSECT
(SELECT user_id FROM raw_events_first);
-- We do not support any set operations
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
-- unsupported JOIN
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id != raw_events_second.user_id
GROUP BY raw_events_second.user_id) AS foo;
-- INSERT partition column does not match with SELECT partition column
INSERT INTO agg_events
(value_4_agg,
value_1_agg,
user_id)
SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.value_3 AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_3) AS foo;
-- error cases
-- no part column at all
INSERT INTO raw_events_second
(value_1)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(value_1)
SELECT user_id
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id * 2
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id :: bigint
FROM raw_events_first;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
value_2
FROM raw_events_first
GROUP BY user_id,
value_2;
-- tables should be co-located
INSERT INTO agg_events (user_id)
SELECT
user_id
FROM
reference_table;
-- unsupported joins between subqueries
-- we do not return bare partition column on the inner query
INSERT INTO agg_events
(user_id)
SELECT f2.id FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second.value_1 AS v1,
SUM(raw_events_second.user_id) AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- the second part of the query is not routable since
-- no GROUP BY on the partition column
INSERT INTO agg_events
(user_id)
SELECT f.id FROM
(SELECT
id
FROM (SELECT raw_events_first.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second.value_1 AS v1,
SUM(raw_events_second.user_id) AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id);
-- cannot pushdown the query since the JOIN is not equi JOIN
INSERT INTO agg_events
(user_id, value_4_agg)
SELECT
outer_most.id, max(outer_most.value)
FROM
(
SELECT f2.id as id, f2.v4 as value FROM
(SELECT
id
FROM (SELECT reference_table.user_id AS id
FROM raw_events_first,
reference_table
WHERE raw_events_first.user_id = reference_table.user_id ) AS foo) as f
INNER JOIN
(SELECT v4,
v1,
id
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
SUM(raw_events_first.value_1) AS v1,
raw_events_second.user_id AS id
FROM raw_events_first,
raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.user_id
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id != f2.id)) as outer_most
GROUP BY outer_most.id;
-- we currently not support grouping sets
INSERT INTO agg_events
(user_id,
value_1_agg,
value_2_agg)
SELECT user_id,
Sum(value_1) AS sum_val1,
Sum(value_2) AS sum_val2
FROM raw_events_second
GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) );
-- set back to INFO
SET client_min_messages TO INFO;
-- Views does not work
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
INSERT INTO raw_events_second SELECT * FROM test_view;

View File

@ -150,8 +150,9 @@ DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::times
-- commands with multiple rows are unsupported -- commands with multiple rows are unsupported
INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
-- Who says that? :)
-- INSERT ... SELECT ... FROM commands are unsupported -- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders; -- INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands containing a CTE are unsupported -- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)