mirror of https://github.com/citusdata/citus.git
Apply first pass of review feedback
parent
3bcd4ef83e
commit
1f74364b23
|
@ -57,23 +57,18 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||||
executorType = JobExecutorType(multiPlan);
|
executorType = JobExecutorType(multiPlan);
|
||||||
if (executorType == MULTI_EXECUTOR_ROUTER)
|
if (executorType == MULTI_EXECUTOR_ROUTER)
|
||||||
{
|
{
|
||||||
Task *task = NULL;
|
|
||||||
List *taskList = workerJob->taskList;
|
|
||||||
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
|
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
|
||||||
planStatement->planTree->targetlist, false);
|
planStatement->planTree->targetlist, false);
|
||||||
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
|
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
|
||||||
|
|
||||||
/* router executor can only execute distributed plans with a single task */
|
/* router executor cannot execute task with depencencies */
|
||||||
Assert(list_length(taskList) == 1);
|
|
||||||
Assert(dependendJobList == NIL);
|
Assert(dependendJobList == NIL);
|
||||||
|
|
||||||
task = (Task *) linitial(taskList);
|
|
||||||
|
|
||||||
/* we need to set tupleDesc in executorStart */
|
/* we need to set tupleDesc in executorStart */
|
||||||
queryDesc->tupDesc = tupleDescriptor;
|
queryDesc->tupDesc = tupleDescriptor;
|
||||||
|
|
||||||
/* drop into the router executor */
|
/* drop into the router executor */
|
||||||
RouterExecutorStart(queryDesc, eflags, task);
|
RouterExecutorStart(queryDesc, eflags);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -133,14 +133,11 @@ static void MarkRemainingInactivePlacements(void);
|
||||||
* execution.
|
* execution.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
RouterExecutorStart(QueryDesc *queryDesc, int eflags)
|
||||||
{
|
{
|
||||||
EState *executorState = NULL;
|
EState *executorState = NULL;
|
||||||
CmdType commandType = queryDesc->operation;
|
CmdType commandType = queryDesc->operation;
|
||||||
|
|
||||||
/* ensure that the task is not NULL */
|
|
||||||
Assert(task != NULL);
|
|
||||||
|
|
||||||
/* disallow triggers during distributed modify commands */
|
/* disallow triggers during distributed modify commands */
|
||||||
if (commandType != CMD_SELECT)
|
if (commandType != CMD_SELECT)
|
||||||
{
|
{
|
||||||
|
|
|
@ -127,8 +127,8 @@ RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
|
||||||
List *workerDependentTaskList = NIL;
|
List *workerDependentTaskList = NIL;
|
||||||
bool masterQueryHasAggregates = false;
|
bool masterQueryHasAggregates = false;
|
||||||
|
|
||||||
/* router executor cannot execute queries that hit more than one shard */
|
/* router executor cannot execute queries that hit zero shards */
|
||||||
if (taskCount != 1)
|
if (taskCount == 0)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -141,6 +141,12 @@ RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* router executor cannot execute SELECT queries that hit more than one shard */
|
||||||
|
if (taskCount != 1)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
|
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -72,8 +72,10 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
* then generate a query for that individual shard. If any of the
|
* then generate a query for that individual shard. If any of the
|
||||||
* involved tables don't prune down to a single shard, or if the
|
* involved tables don't prune down to a single shard, or if the
|
||||||
* pruned shards aren't colocated, we error out.
|
* pruned shards aren't colocated, we error out.
|
||||||
|
*
|
||||||
|
* TODO: we currently not support CTEs.
|
||||||
*/
|
*/
|
||||||
if (InsertSelectQuery(parse))
|
if (InsertSelectQuery(parse) && parse->cteList == NULL)
|
||||||
{
|
{
|
||||||
AddHiddenPartitionColumnParameter(parse);
|
AddHiddenPartitionColumnParameter(parse);
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,12 +106,14 @@ static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecut
|
||||||
static RelationRestrictionContext * copyRelationRestrictionContext(
|
static RelationRestrictionContext * copyRelationRestrictionContext(
|
||||||
RelationRestrictionContext *oldContext);
|
RelationRestrictionContext *oldContext);
|
||||||
static Node * ReplaceHiddenParameter(Node *node, void *context);
|
static Node * ReplaceHiddenParameter(Node *node, void *context);
|
||||||
static Var * MakeInt4Column();
|
static Var * MakeInt4Column(void);
|
||||||
static Const * MakeInt4Constant(Datum constantValue);
|
static Const * MakeInt4Constant(Datum constantValue);
|
||||||
static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree);
|
static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree);
|
||||||
|
static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query);
|
||||||
static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query);
|
static void ErrorIfNotAllParticipatingTablesAreColocated(Query *query);
|
||||||
static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query);
|
static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query);
|
||||||
static void AddHiddenParameterToFirstTableRecursively(Query *query);
|
static Query * FirstQueryReferencingDistributedTable(Query *query);
|
||||||
|
static void AddHiddenParameterToFirstTable(Query *query);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -249,6 +251,7 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
||||||
List *insertShardPlacementList = NULL;
|
List *insertShardPlacementList = NULL;
|
||||||
List *intersectedPlacementList = NULL;
|
List *intersectedPlacementList = NULL;
|
||||||
|
RangeTblEntry *rangeTableEntry = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Replace the magic value in all baserestrictinfos. Note that
|
* Replace the magic value in all baserestrictinfos. Note that
|
||||||
|
@ -276,7 +279,11 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
|
|
||||||
if (routerQuery == NULL)
|
if (routerQuery == NULL)
|
||||||
{
|
{
|
||||||
elog(ERROR, "couldn't prune down sufficiently for insert pushdown");
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"Select query cannot be pushed down to the worker.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Ensure that we have INSERTed table's placement exists on the same worker */
|
/* Ensure that we have INSERTed table's placement exists on the same worker */
|
||||||
|
@ -287,21 +294,34 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
if (list_length(insertShardPlacementList) != list_length(
|
if (list_length(insertShardPlacementList) != list_length(
|
||||||
intersectedPlacementList))
|
intersectedPlacementList))
|
||||||
{
|
{
|
||||||
ereport(DEBUG2, (errmsg("insert table does not have the same placements on "
|
ereport(DEBUG2, (errmsg("skipping the task"),
|
||||||
"the select placement list. Skipping this task")));
|
errdetail("Insert query hits %d placements, Select query "
|
||||||
|
"hits %d placements and only %d of those placements match.",
|
||||||
|
list_length(insertShardPlacementList), list_length(
|
||||||
|
selectPlacementList),
|
||||||
|
list_length(intersectedPlacementList))));
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* this is required for correct deparsing of the query */
|
||||||
ReorderInsertSelectTargetListsIfExists(copiedOriginal);
|
ReorderInsertSelectTargetListsIfExists(copiedOriginal);
|
||||||
|
|
||||||
|
/* setting an alias simplifies deparsing of RETURNING */
|
||||||
|
rangeTableEntry = linitial(copiedOriginal->rtable);
|
||||||
|
if (rangeTableEntry->alias == NULL)
|
||||||
|
{
|
||||||
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||||
|
rangeTableEntry->alias = alias;
|
||||||
|
}
|
||||||
|
|
||||||
/* and generate the full query string */
|
/* and generate the full query string */
|
||||||
deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId,
|
deparse_shard_query(copiedOriginal, distributedTableId, shardInterval->shardId,
|
||||||
queryString);
|
queryString);
|
||||||
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
||||||
|
|
||||||
|
|
||||||
sqlTask = CreateBasicTask(jobId, taskIdIndex++, SQL_TASK, queryString->data);
|
sqlTask = CreateBasicTask(jobId, taskIdIndex++, MODIFY_TASK, queryString->data);
|
||||||
sqlTask->dependedTaskList = NULL;
|
sqlTask->dependedTaskList = NULL;
|
||||||
sqlTask->anchorShardId = shardId;
|
sqlTask->anchorShardId = shardId;
|
||||||
sqlTask->taskPlacementList = insertShardPlacementList;
|
sqlTask->taskPlacementList = insertShardPlacementList;
|
||||||
|
@ -336,18 +356,14 @@ CreateMultiTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
static void
|
static void
|
||||||
ErrorIfInsertSelectQueryNotSupported(Query *queryTree)
|
ErrorIfInsertSelectQueryNotSupported(Query *queryTree)
|
||||||
{
|
{
|
||||||
RangeTblEntry *insertRte = NULL;
|
|
||||||
RangeTblEntry *subqueryRte = NULL;
|
RangeTblEntry *subqueryRte = NULL;
|
||||||
Query *subquery = NULL;
|
Query *subquery = NULL;
|
||||||
Oid insertRelationId = InvalidOid;
|
|
||||||
|
|
||||||
/* we only do this check for INSERT ... SELECT queries */
|
/* we only do this check for INSERT ... SELECT queries */
|
||||||
AssertArg(InsertSelectQuery(queryTree));
|
AssertArg(InsertSelectQuery(queryTree));
|
||||||
|
|
||||||
insertRte = linitial(queryTree->rtable);
|
|
||||||
subqueryRte = lsecond(queryTree->rtable);
|
subqueryRte = lsecond(queryTree->rtable);
|
||||||
subquery = subqueryRte->subquery;
|
subquery = subqueryRte->subquery;
|
||||||
insertRelationId = insertRte->relid;
|
|
||||||
|
|
||||||
/* we support this feature only for colocated tables */
|
/* we support this feature only for colocated tables */
|
||||||
ErrorIfNotAllParticipatingTablesAreColocated(queryTree);
|
ErrorIfNotAllParticipatingTablesAreColocated(queryTree);
|
||||||
|
@ -355,36 +371,93 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree)
|
||||||
if (contain_mutable_functions((Node *) queryTree))
|
if (contain_mutable_functions((Node *) queryTree))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("non-IMMUTABLE functions are not allowed in INSERT ... "
|
errmsg("cannot perform distributed planning for the given "
|
||||||
"SELECT queries")));
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"Stable and volatile functions are not allowed in INSERT ... "
|
||||||
|
"SELECT queries")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subquery->limitCount != NULL)
|
if (queryTree->cteList != NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("LIMIT clause are not allowed in INSERT ... SELECT "
|
errmsg("cannot perform distributed planning for the given "
|
||||||
"queries")));
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"Common table expressions are not allowed in INSERT ... SELECT "
|
||||||
|
"queries")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subquery->limitOffset != NULL)
|
/* we don't support LIMIT, OFFSET and WINDOW functions */
|
||||||
{
|
ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery);
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("OFFSET clause are not allowed in INSERT ... SELECT "
|
|
||||||
"queries")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*TODO: check with Andres. Should we allow on partition column? I'm cool with not having window functions */
|
|
||||||
if (subquery->windowClause != NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("Window functions are not allowed in INSERT ... SELECT "
|
|
||||||
"queries")));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/* ensure that INSERT's partition column comes from SELECT's partition column */
|
||||||
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree);
|
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorUnsupportedMultiTaskSelectQuery errors out on queries that we support
|
||||||
|
* for single task router queries, but, cannot allow for multi task router
|
||||||
|
* queries. We do these checks recursively to prevent any wrong results.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query)
|
||||||
|
{
|
||||||
|
List *queryList = NIL;
|
||||||
|
ListCell *queryCell = NULL;
|
||||||
|
|
||||||
|
ExtractQueryWalker((Node *) query, &queryList);
|
||||||
|
foreach(queryCell, queryList)
|
||||||
|
{
|
||||||
|
Query *subquery = (Query *) lfirst(queryCell);
|
||||||
|
|
||||||
|
Assert(subquery->commandType == CMD_SELECT);
|
||||||
|
|
||||||
|
if (subquery->limitCount != NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"LIMIT clauses are not allowed in INSERT ... SELECT "
|
||||||
|
"queries")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (subquery->limitOffset != NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"OFFSET clauses are not allowed in INSERT ... SELECT "
|
||||||
|
"queries")
|
||||||
|
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (subquery->windowClause != NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"Window functions are not allowed in INSERT ... SELECT "
|
||||||
|
"queries")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (subquery->setOperations != NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given "
|
||||||
|
"modification"),
|
||||||
|
errdetail(
|
||||||
|
"Union, Intersect and Except are currently unsupported are not allowed in INSERT ... SELECT "
|
||||||
|
"queries")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables
|
* ErrorIfNotAllParticipatingTablesAreColocated errors out of all tables
|
||||||
|
@ -428,8 +501,6 @@ ErrorIfNotAllParticipatingTablesAreColocated(Query *query)
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("all participating tables should be colocated")));
|
errmsg("all participating tables should be colocated")));
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -497,8 +568,7 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AddHiddenPartitionColumnParameter() can only be used with
|
* AddHiddenPartitionColumnParameter() can only be used with
|
||||||
* INSERT ... SELECT queries. We add this hidden parameter to
|
* INSERT ... SELECT queries.
|
||||||
* recursively for subqueries.
|
|
||||||
*
|
*
|
||||||
* If the input query is not INSERT .. SELECT the function errors-out.
|
* If the input query is not INSERT .. SELECT the function errors-out.
|
||||||
*/
|
*/
|
||||||
|
@ -506,26 +576,68 @@ void
|
||||||
AddHiddenPartitionColumnParameter(Query *originalQuery)
|
AddHiddenPartitionColumnParameter(Query *originalQuery)
|
||||||
{
|
{
|
||||||
Query *subquery = NULL;
|
Query *subquery = NULL;
|
||||||
|
Query *referencedSubquery = NULL;
|
||||||
RangeTblEntry *subqueryEntry = NULL;
|
RangeTblEntry *subqueryEntry = NULL;
|
||||||
|
|
||||||
if (!InsertSelectQuery(originalQuery))
|
if (!InsertSelectQuery(originalQuery))
|
||||||
{
|
{
|
||||||
elog(ERROR, "Only INSERT .. SELECT queries can be modified");
|
elog(ERROR, "Only INSERT .. SELECT queries can be modified");
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot perform distributed planning for the given"
|
||||||
|
" modification")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* TODO: once CTEs are present, this does not work */
|
||||||
subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1);
|
subqueryEntry = (RangeTblEntry *) list_nth(originalQuery->rtable, 1);
|
||||||
subquery = subqueryEntry->subquery;
|
subquery = subqueryEntry->subquery;
|
||||||
|
|
||||||
AddHiddenParameterToFirstTableRecursively(subquery);
|
referencedSubquery = FirstQueryReferencingDistributedTable(subquery);
|
||||||
|
|
||||||
|
AddHiddenParameterToFirstTable(referencedSubquery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static Query *
|
||||||
|
FirstQueryReferencingDistributedTable(Query *query)
|
||||||
|
{
|
||||||
|
List *queryList = NIL;
|
||||||
|
ListCell *queryCell = NULL;
|
||||||
|
Query *subquery = NULL;
|
||||||
|
|
||||||
|
ExtractQueryWalker((Node *) query, &queryList);
|
||||||
|
|
||||||
|
/* iterate and find the query which references to distributed table */
|
||||||
|
foreach(queryCell, queryList)
|
||||||
|
{
|
||||||
|
Query *innerSubquery = (Query *) lfirst(queryCell);
|
||||||
|
|
||||||
|
List *rangeTableList = NIL;
|
||||||
|
ListCell *rangeTableCell = NULL;
|
||||||
|
|
||||||
|
/* extract range table entries */
|
||||||
|
ExtractRangeTableEntryWalker((Node *) innerSubquery, &rangeTableList);
|
||||||
|
foreach(rangeTableCell, rangeTableList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
|
||||||
|
if (IsDistributedTable(rangeTableEntry->relid))
|
||||||
|
{
|
||||||
|
subquery = innerSubquery;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return subquery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AddHiddenParameterToFirstTableRecursively adds a hidden parameter
|
* AddHiddenParameterToFirstTable adds a hidden parameter
|
||||||
* ($1 = partitionColumn) for the first table on the query.
|
* ($1 = partitionColumn) for the first table on the query.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AddHiddenParameterToFirstTableRecursively(Query *query)
|
AddHiddenParameterToFirstTable(Query *query)
|
||||||
{
|
{
|
||||||
Param *hiddenParam = makeNode(Param);
|
Param *hiddenParam = makeNode(Param);
|
||||||
Node *hiddenBound = NULL;
|
Node *hiddenBound = NULL;
|
||||||
|
@ -537,9 +649,6 @@ AddHiddenParameterToFirstTableRecursively(Query *query)
|
||||||
Oid greaterOperator = InvalidOid;
|
Oid greaterOperator = InvalidOid;
|
||||||
bool hashable = false;
|
bool hashable = false;
|
||||||
|
|
||||||
List *subqueryEntryList = NIL;
|
|
||||||
ListCell *rangeTableEntryCell = NULL;
|
|
||||||
|
|
||||||
AssertArg(query->commandType == CMD_SELECT);
|
AssertArg(query->commandType == CMD_SELECT);
|
||||||
|
|
||||||
hiddenParam->paramkind = PARAM_EXTERN;
|
hiddenParam->paramkind = PARAM_EXTERN;
|
||||||
|
@ -574,17 +683,6 @@ AddHiddenParameterToFirstTableRecursively(Query *query)
|
||||||
query->jointree->quals = make_and_qual(query->jointree->quals,
|
query->jointree->quals = make_and_qual(query->jointree->quals,
|
||||||
hiddenBound);
|
hiddenBound);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* recursively do same addition for subqueries of this query */
|
|
||||||
subqueryEntryList = SubqueryEntryList(query);
|
|
||||||
foreach(rangeTableEntryCell, subqueryEntryList)
|
|
||||||
{
|
|
||||||
RangeTblEntry *rangeTableEntry =
|
|
||||||
(RangeTblEntry *) lfirst(rangeTableEntryCell);
|
|
||||||
|
|
||||||
Query *innerSubquery = rangeTableEntry->subquery;
|
|
||||||
AddHiddenParameterToFirstTableRecursively(innerSubquery);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1205,7 +1303,7 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
||||||
rangeTableEntry = linitial(originalQuery->rtable);
|
rangeTableEntry = linitial(originalQuery->rtable);
|
||||||
if (rangeTableEntry->alias == NULL)
|
if (rangeTableEntry->alias == NULL)
|
||||||
{
|
{
|
||||||
Alias *alias = makeAlias(UPSERT_ALIAS, NIL);
|
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||||
rangeTableEntry->alias = alias;
|
rangeTableEntry->alias = alias;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1494,7 +1592,8 @@ RouterSelectTask(Query *originalQuery, Query *query,
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->dependedTaskList = NIL;
|
task->dependedTaskList = NIL;
|
||||||
task->upsertQuery = upsertQuery;
|
task->upsertQuery = upsertQuery;
|
||||||
//task->requiresMasterEvaluation = false;
|
|
||||||
|
/* task->requiresMasterEvaluation = false; */
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
@ -1622,11 +1721,6 @@ TargetShardIntervalsForSelect(Query *query,
|
||||||
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
|
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
|
||||||
bool whereFalseQuery = false;
|
bool whereFalseQuery = false;
|
||||||
|
|
||||||
/* elog(DEBUG2, "relation id: %d", relationId); */
|
|
||||||
/* elog(DEBUG2, "restrictClauseList-: %s", pretty_format_node_dump(nodeToString(restrictClauseList))); */
|
|
||||||
/* elog(DEBUG2, "join info-: %s", pretty_format_node_dump(nodeToString(relationRestriction->relOptInfo->joininfo))); */
|
|
||||||
|
|
||||||
|
|
||||||
relationRestriction->prunedShardIntervalList = NIL;
|
relationRestriction->prunedShardIntervalList = NIL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2021,6 +2115,8 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#include "nodes/print.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT
|
* ReorderInsertSelectTargetListsIfExists reorders the target lists of INSERT/SELECT
|
||||||
* query which is required for deparsing purposes. The reordered query is returned.
|
* query which is required for deparsing purposes. The reordered query is returned.
|
||||||
|
@ -2288,7 +2384,7 @@ ReplaceHiddenParameter(Node *node, void *context)
|
||||||
{
|
{
|
||||||
param = (Param *) leftop;
|
param = (Param *) leftop;
|
||||||
}
|
}
|
||||||
else if (IsA(rightop, Param)) //IsA(leftop, Var))/* &&) */
|
else if (IsA(rightop, Param)) /* IsA(leftop, Var))/ * &&) * / */
|
||||||
{
|
{
|
||||||
param = (Param *) rightop;
|
param = (Param *) rightop;
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ typedef struct XactShardConnSet
|
||||||
extern bool AllModificationsCommutative;
|
extern bool AllModificationsCommutative;
|
||||||
|
|
||||||
|
|
||||||
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
|
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags);
|
||||||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
#define HIDDEN_PARAMETER_ID 0xdeadbeef
|
#define HIDDEN_PARAMETER_ID 0xdeadbeef
|
||||||
|
|
||||||
/* reserved alias name for UPSERTs */
|
/* reserved alias name for UPSERTs */
|
||||||
#define UPSERT_ALIAS "citus_table_alias"
|
#define CITUS_TABLE_ALIAS "citus_table_alias"
|
||||||
|
|
||||||
|
|
||||||
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
||||||
|
|
|
@ -0,0 +1,736 @@
|
||||||
|
--
|
||||||
|
-- MULTI_INSERT_SELECT
|
||||||
|
--
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000;
|
||||||
|
CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
|
||||||
|
SELECT master_create_distributed_table('raw_events_first', 'user_id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('raw_events_first', 4, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
|
||||||
|
SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('raw_events_second', 4, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg));
|
||||||
|
SELECT master_create_distributed_table('agg_events', 'user_id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('agg_events', 4, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- make tables as co-located
|
||||||
|
UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events');
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(1, now(), 10, 100, 1000.1, 10000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(2, now(), 20, 200, 2000.1, 20000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(3, now(), 30, 300, 3000.1, 30000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(4, now(), 40, 400, 4000.1, 40000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(5, now(), 50, 500, 5000.1, 50000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(6, now(), 60, 600, 6000.1, 60000);
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
-- raw table to raw table
|
||||||
|
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300000 raw_events_first
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300001 raw_events_first
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300002 raw_events_first
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300003 raw_events_first
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300007
|
||||||
|
DEBUG: sent COMMIT over connection 13300007
|
||||||
|
DEBUG: sent COMMIT over connection 13300005
|
||||||
|
DEBUG: sent COMMIT over connection 13300005
|
||||||
|
DEBUG: sent COMMIT over connection 13300006
|
||||||
|
DEBUG: sent COMMIT over connection 13300006
|
||||||
|
DEBUG: sent COMMIT over connection 13300004
|
||||||
|
DEBUG: sent COMMIT over connection 13300004
|
||||||
|
-- see that our first multi shard INSERT...SELECT works expected
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: ProcessUtility
|
||||||
|
SELECT
|
||||||
|
raw_events_first.user_id
|
||||||
|
FROM
|
||||||
|
raw_events_first, raw_events_second
|
||||||
|
WHERE
|
||||||
|
raw_events_first.user_id = raw_events_second.user_id;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
1
|
||||||
|
5
|
||||||
|
3
|
||||||
|
4
|
||||||
|
6
|
||||||
|
2
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
-- see that we get unique vialitons
|
||||||
|
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
|
||||||
|
ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300004"
|
||||||
|
DETAIL: Key (user_id, value_1)=(1, 10) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
-- add one more row
|
||||||
|
INSERT INTO raw_events_first (user_id, time) VALUES
|
||||||
|
(7, now());
|
||||||
|
-- try a single shard query
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, "time") SELECT user_id, "time" FROM public.raw_events_first_13300001 raw_events_first WHERE (user_id = 7)
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: ProcessUtility
|
||||||
|
-- add one more row
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(8, now(), 80, 800, 8000, 80000);
|
||||||
|
-- reorder columns
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
|
||||||
|
SELECT
|
||||||
|
value_2, value_1, value_3, value_4, user_id, time
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
user_id = 8;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_first_13300000 raw_events_first WHERE (user_id = 8)
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
-- add one more row
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: ProcessUtility
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(9, now(), 90, 900, 9000, 90000);
|
||||||
|
-- show that RETURNING also works
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
INSERT INTO raw_events_second (user_id, value_1, value_3)
|
||||||
|
SELECT
|
||||||
|
user_id, value_1, value_3
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
value_3 = 9000
|
||||||
|
RETURNING *;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300000 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300001 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300002 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300003 raw_events_first WHERE (value_3 = (9000)::double precision) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300007
|
||||||
|
DEBUG: sent COMMIT over connection 13300007
|
||||||
|
DEBUG: sent COMMIT over connection 13300005
|
||||||
|
DEBUG: sent COMMIT over connection 13300005
|
||||||
|
DEBUG: sent COMMIT over connection 13300006
|
||||||
|
DEBUG: sent COMMIT over connection 13300006
|
||||||
|
DEBUG: sent COMMIT over connection 13300004
|
||||||
|
DEBUG: sent COMMIT over connection 13300004
|
||||||
|
user_id | time | value_1 | value_2 | value_3 | value_4
|
||||||
|
---------+------+---------+---------+---------+---------
|
||||||
|
9 | | 90 | | 9000 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- hits two shards
|
||||||
|
INSERT INTO raw_events_second (user_id, value_1, value_3)
|
||||||
|
SELECT
|
||||||
|
user_id, value_1, value_3
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
user_id = 9 OR user_id = 16
|
||||||
|
RETURNING *;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300001 raw_events_first WHERE ((user_id = 9) OR (user_id = 16)) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: skipping the task
|
||||||
|
DETAIL: Insert query hits 2 placements, Select query hits 1 placements and only 1 of those placements match.
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id, value_1, value_3) SELECT user_id, value_1, value_3 FROM public.raw_events_first_13300003 raw_events_first WHERE ((user_id = 9) OR (user_id = 16)) RETURNING citus_table_alias.user_id, citus_table_alias."time", citus_table_alias.value_1, citus_table_alias.value_2, citus_table_alias.value_3, citus_table_alias.value_4
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
ERROR: duplicate key value violates unique constraint "raw_events_second_user_id_value_1_key_13300007"
|
||||||
|
DETAIL: Key (user_id, value_1)=(9, 90) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
-- now do some aggregations
|
||||||
|
INSERT INTO agg_events
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4)
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
GROUP BY
|
||||||
|
user_id;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_2_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, avg(value_2) AS avg, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
-- group by column not exists on the SELECT target list
|
||||||
|
-- TODO: there is a bug on RETURNING
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
|
||||||
|
SELECT
|
||||||
|
sum(value_3), count(value_4), sum(value_1), user_id
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
GROUP BY
|
||||||
|
value_2, user_id
|
||||||
|
RETURNING *;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300000 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300001 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300002 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_3_agg, value_4_agg) SELECT user_id, sum(value_1) AS sum, sum(value_3) AS sum, count(value_4) AS count FROM public.raw_events_first_13300003 raw_events_first GROUP BY value_2, user_id RETURNING citus_table_alias.user_id, citus_table_alias.value_1_agg, citus_table_alias.value_2_agg, citus_table_alias.value_3_agg, citus_table_alias.value_4_agg, citus_table_alias.agg_time
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008"
|
||||||
|
DETAIL: Key (user_id, value_1_agg)=(8, 80) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
-- some subquery tests
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT SUM(value_1),
|
||||||
|
id
|
||||||
|
FROM (SELECT raw_events_second.user_id AS id,
|
||||||
|
raw_events_second.value_1
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo
|
||||||
|
GROUP BY id;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300004 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300005 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT id, sum(value_1) AS sum FROM (SELECT raw_events_second.user_id AS id, raw_events_second.value_1 FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id)) foo GROUP BY id
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008"
|
||||||
|
DETAIL: Key (user_id, value_1_agg)=(1, 10) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
-- subquery one more level depth
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.user_id AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.user_id) AS foo;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300004 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300005 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300007
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300006 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300004
|
||||||
|
DEBUG: predicate pruning for shardId 13300005
|
||||||
|
DEBUG: predicate pruning for shardId 13300006
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg, value_4_agg) SELECT id, v1, v4 FROM (SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.user_id AS id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (raw_events_first.user_id = raw_events_second.user_id) GROUP BY raw_events_second.user_id) foo
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
ERROR: duplicate key value violates unique constraint "agg_events_user_id_value_1_agg_key_13300008"
|
||||||
|
DETAIL: Key (user_id, value_1_agg)=(1, 10) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
-- some UPSERTS
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET agg_time = EXCLUDED.agg_time
|
||||||
|
WHERE ae.agg_time < EXCLUDED.agg_time;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300000 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time)
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300001 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time)
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300002 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time)
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300003 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time)
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
-- upserts with returning
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET agg_time = EXCLUDED.agg_time
|
||||||
|
WHERE ae.agg_time < EXCLUDED.agg_time
|
||||||
|
RETURNING user_id, value_1_agg;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300000 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300001 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300002 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS ae (user_id, value_1_agg, agg_time) SELECT user_id, value_1, "time" FROM public.raw_events_first_13300003 raw_events_first ON CONFLICT(user_id, value_1_agg) DO UPDATE SET agg_time = excluded.agg_time WHERE (ae.agg_time < excluded.agg_time) RETURNING ae.user_id, ae.value_1_agg
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
7 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- TODO:: add hll and date_trunc
|
||||||
|
INSERT INTO agg_events (user_id, value_1_agg)
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1 + value_2)
|
||||||
|
FROM
|
||||||
|
raw_events_first GROUP BY user_id;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) AS sum FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
-- FILTER CLAUSE
|
||||||
|
INSERT INTO agg_events (user_id, value_1_agg)
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1 + value_2) FILTER (where value_3 = 15)
|
||||||
|
FROM
|
||||||
|
raw_events_first GROUP BY user_id;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300000 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300001 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300002 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: predicate pruning for shardId 13300000
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT user_id, sum((value_1 + value_2)) FILTER (WHERE (value_3 = (15)::double precision)) AS sum FROM public.raw_events_first_13300003 raw_events_first GROUP BY user_id
|
||||||
|
DEBUG: ProcessQuery
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: CommitTransactionCommand
|
||||||
|
DEBUG: CommitTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300008
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300011
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300009
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
DEBUG: sent COMMIT over connection 13300010
|
||||||
|
-- TODO: UUIDs
|
||||||
|
-- unsupported JOIN
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.user_id AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id != raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.user_id) AS foo;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
DEBUG: predicate pruning for shardId 13300001
|
||||||
|
DEBUG: predicate pruning for shardId 13300002
|
||||||
|
DEBUG: predicate pruning for shardId 13300003
|
||||||
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
|
-- INSERT partition column does not match with SELECT partition column
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.value_3 AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.value_3) AS foo;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
-- error cases
|
||||||
|
-- no part column at all
|
||||||
|
INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2;
|
||||||
|
DEBUG: StartTransactionCommand
|
||||||
|
DEBUG: StartTransaction
|
||||||
|
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
|
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
|
|
@ -29,6 +29,8 @@ test: multi_create_table_constraints
|
||||||
test: multi_master_protocol
|
test: multi_master_protocol
|
||||||
test: multi_load_data
|
test: multi_load_data
|
||||||
|
|
||||||
|
test: multi_insert_select
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
# ----------
|
# ----------
|
||||||
|
|
|
@ -0,0 +1,256 @@
|
||||||
|
--
|
||||||
|
-- MULTI_INSERT_SELECT
|
||||||
|
--
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 13300000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 13300000;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE raw_events_first (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
|
||||||
|
SELECT master_create_distributed_table('raw_events_first', 'user_id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('raw_events_first', 4, 2);
|
||||||
|
|
||||||
|
CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint, UNIQUE(user_id, value_1));
|
||||||
|
SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('raw_events_second', 4, 2);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg));
|
||||||
|
SELECT master_create_distributed_table('agg_events', 'user_id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('agg_events', 4, 2);
|
||||||
|
|
||||||
|
-- make tables as co-located
|
||||||
|
UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events');
|
||||||
|
|
||||||
|
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(1, now(), 10, 100, 1000.1, 10000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(2, now(), 20, 200, 2000.1, 20000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(3, now(), 30, 300, 3000.1, 30000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(4, now(), 40, 400, 4000.1, 40000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(5, now(), 50, 500, 5000.1, 50000);
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(6, now(), 60, 600, 6000.1, 60000);
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
|
||||||
|
-- raw table to raw table
|
||||||
|
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
|
||||||
|
|
||||||
|
-- see that our first multi shard INSERT...SELECT works expected
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
SELECT
|
||||||
|
raw_events_first.user_id
|
||||||
|
FROM
|
||||||
|
raw_events_first, raw_events_second
|
||||||
|
WHERE
|
||||||
|
raw_events_first.user_id = raw_events_second.user_id;
|
||||||
|
|
||||||
|
-- see that we get unique vialitons
|
||||||
|
INSERT INTO raw_events_second SELECT * FROM raw_events_first;
|
||||||
|
|
||||||
|
-- add one more row
|
||||||
|
INSERT INTO raw_events_first (user_id, time) VALUES
|
||||||
|
(7, now());
|
||||||
|
|
||||||
|
-- try a single shard query
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
INSERT INTO raw_events_second (user_id, time) SELECT user_id, time FROM raw_events_first WHERE user_id = 7;
|
||||||
|
|
||||||
|
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
|
||||||
|
-- add one more row
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(8, now(), 80, 800, 8000, 80000);
|
||||||
|
|
||||||
|
|
||||||
|
-- reorder columns
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
INSERT INTO raw_events_second (value_2, value_1, value_3, value_4, user_id, time)
|
||||||
|
SELECT
|
||||||
|
value_2, value_1, value_3, value_4, user_id, time
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
user_id = 8;
|
||||||
|
|
||||||
|
|
||||||
|
-- add one more row
|
||||||
|
SET client_min_messages TO INFO;
|
||||||
|
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
|
||||||
|
(9, now(), 90, 900, 9000, 90000);
|
||||||
|
|
||||||
|
|
||||||
|
-- show that RETURNING also works
|
||||||
|
SET client_min_messages TO DEBUG4;
|
||||||
|
INSERT INTO raw_events_second (user_id, value_1, value_3)
|
||||||
|
SELECT
|
||||||
|
user_id, value_1, value_3
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
value_3 = 9000
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
-- hits two shards
|
||||||
|
INSERT INTO raw_events_second (user_id, value_1, value_3)
|
||||||
|
SELECT
|
||||||
|
user_id, value_1, value_3
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
WHERE
|
||||||
|
user_id = 9 OR user_id = 16
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
|
||||||
|
-- now do some aggregations
|
||||||
|
INSERT INTO agg_events
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1), avg(value_2), sum(value_3), count(value_4)
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
GROUP BY
|
||||||
|
user_id;
|
||||||
|
|
||||||
|
-- group by column not exists on the SELECT target list
|
||||||
|
-- TODO: there is a bug on RETURNING
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
|
||||||
|
SELECT
|
||||||
|
sum(value_3), count(value_4), sum(value_1), user_id
|
||||||
|
FROM
|
||||||
|
raw_events_first
|
||||||
|
GROUP BY
|
||||||
|
value_2, user_id
|
||||||
|
RETURNING *;
|
||||||
|
|
||||||
|
|
||||||
|
-- some subquery tests
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT SUM(value_1),
|
||||||
|
id
|
||||||
|
FROM (SELECT raw_events_second.user_id AS id,
|
||||||
|
raw_events_second.value_1
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id) AS foo
|
||||||
|
GROUP BY id;
|
||||||
|
|
||||||
|
|
||||||
|
-- subquery one more level depth
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.user_id AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.user_id) AS foo;
|
||||||
|
|
||||||
|
|
||||||
|
-- some UPSERTS
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET agg_time = EXCLUDED.agg_time
|
||||||
|
WHERE ae.agg_time < EXCLUDED.agg_time;
|
||||||
|
|
||||||
|
-- upserts with returning
|
||||||
|
INSERT INTO agg_events AS ae
|
||||||
|
(
|
||||||
|
user_id,
|
||||||
|
value_1_agg,
|
||||||
|
agg_time
|
||||||
|
)
|
||||||
|
SELECT user_id,
|
||||||
|
value_1,
|
||||||
|
time
|
||||||
|
FROM raw_events_first
|
||||||
|
ON conflict (user_id, value_1_agg)
|
||||||
|
DO UPDATE
|
||||||
|
SET agg_time = EXCLUDED.agg_time
|
||||||
|
WHERE ae.agg_time < EXCLUDED.agg_time
|
||||||
|
RETURNING user_id, value_1_agg;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
-- TODO:: add hll and date_trunc
|
||||||
|
INSERT INTO agg_events (user_id, value_1_agg)
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1 + value_2)
|
||||||
|
FROM
|
||||||
|
raw_events_first GROUP BY user_id;
|
||||||
|
|
||||||
|
-- FILTER CLAUSE
|
||||||
|
INSERT INTO agg_events (user_id, value_1_agg)
|
||||||
|
SELECT
|
||||||
|
user_id, sum(value_1 + value_2) FILTER (where value_3 = 15)
|
||||||
|
FROM
|
||||||
|
raw_events_first GROUP BY user_id;
|
||||||
|
|
||||||
|
|
||||||
|
-- TODO: UUIDs
|
||||||
|
|
||||||
|
-- unsupported JOIN
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.user_id AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id != raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.user_id) AS foo;
|
||||||
|
|
||||||
|
|
||||||
|
-- INSERT partition column does not match with SELECT partition column
|
||||||
|
INSERT INTO agg_events
|
||||||
|
(value_4_agg,
|
||||||
|
value_1_agg,
|
||||||
|
user_id)
|
||||||
|
SELECT v4,
|
||||||
|
v1,
|
||||||
|
id
|
||||||
|
FROM (SELECT SUM(raw_events_second.value_4) AS v4,
|
||||||
|
SUM(raw_events_first.value_1) AS v1,
|
||||||
|
raw_events_second.value_3 AS id
|
||||||
|
FROM raw_events_first,
|
||||||
|
raw_events_second
|
||||||
|
WHERE raw_events_first.user_id = raw_events_second.user_id
|
||||||
|
GROUP BY raw_events_second.value_3) AS foo;
|
||||||
|
|
||||||
|
-- error cases
|
||||||
|
-- no part column at all
|
||||||
|
INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first;
|
||||||
|
INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first;
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first;
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first;
|
||||||
|
INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first;
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id;
|
||||||
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2;
|
Loading…
Reference in New Issue