apply some of the reviews

pull/896/head
Onder Kalaci 2016-10-11 11:19:03 +03:00
parent 1f74364b23
commit 916d8ef406
8 changed files with 655 additions and 405 deletions

View File

@ -130,8 +130,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static Var * MakeInt4Column(void);
static Const * MakeInt4Constant(Datum constantValue);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
static List * BuildRestrictInfoList(List *qualList);
static List * FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
@ -3015,7 +3013,7 @@ MakeHashedOperatorExpression(OpExpr *operatorExpression)
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
*/
static Var *
Var *
MakeInt4Column()
{
Index tableId = 0;
@ -3035,7 +3033,7 @@ MakeInt4Column()
* MakeInt4Constant creates a new constant of int4 type and assigns the given
* value as a constant value.
*/
static Const *
Const *
MakeInt4Constant(Datum constantValue)
{
Oid constantType = INT4OID;

View File

@ -62,22 +62,22 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/*
* We implement INSERT INTO .. SELECT by pushing down the SELECT to
* each shard. That requires that the SELECT is co-located with the
* target table. To compute that we use the router planner, by adding
* each shard. To compute that we use the router planner, by adding
* a "hidden" constraint that the partition column be equal to a
* certain value. standard_planner() distributes that constraint to
* all affected table's baserestrictinfos. The router planner then
* iterates over the target table's shards, for each we replace the
* "hidden" restriction, with one that PruneShardList() handles, and
* then generate a query for that individual shard. If any of the
* involved tables don't prune down to a single shard, or if the
* pruned shards aren't colocated, we error out.
* the baserestrictinfos of the tables that they are connected
* via equi joins.
*
* TODO: we currently not support CTEs.
* The router planner then iterates over the target table's shards,
* for each we replace the "hidden" restriction, with one that
* PruneShardList() handles, and then generate a query for that
* individual shard. If any of the involved tables don't prune down
* to a single shard, or if the pruned shards aren't colocated,
* we error out.
*/
if (InsertSelectQuery(parse) && parse->cteList == NULL)
if (InsertSelectQuery(parse))
{
AddHiddenPartitionColumnParameter(parse);
AddHiddenPartitionColumnEqualityQual(parse);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -59,7 +59,16 @@ deparse_shard_query_test(PG_FUNCTION_ARGS)
Query *query = lfirst(queryTreeCell);
StringInfo buffer = makeStringInfo();
ReorderInsertSelectTargetListsIfExists(query);
/* reoreder the target list only for INSERT .. SELECT queries */
if (InsertSelectQuery(query))
{
RangeTblEntry *insertRte = linitial(query->rtable);
RangeTblEntry *subqueryRte = lsecond(query->rtable);
ReorderInsertSelectTargetLists(query, insertRte, subqueryRte);
}
deparse_shard_query(query, InvalidOid, 0, buffer);
elog(INFO, "query: %s", buffer->data);

View File

@ -245,9 +245,10 @@ extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval)
extern bool SimpleOpExpression(Expr *clause);
extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn);
/* helper functions */
extern Var * MakeInt4Column(void);
extern Const * MakeInt4Constant(Datum constantValue);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
/* Function declarations for sorting shards. */
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);

View File

@ -31,9 +31,11 @@
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
extern void AddHiddenPartitionColumnParameter(Query *originalQuery);
extern void AddHiddenPartitionColumnEqualityQual(Query *originalQuery);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
extern Query * ReorderInsertSelectTargetListsIfExists(Query *originalQuery);
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte);
extern bool InsertSelectQuery(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -44,6 +44,19 @@ SELECT master_create_worker_shards('agg_events', 4, 2);
-- make tables as co-located
UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events');
CREATE TABLE reference_table (user_id int);
SELECT master_create_distributed_table('reference_table', 'user_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('reference_table', 1, 2);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(1, now(), 10, 100, 1000.1, 10000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
@ -654,7 +667,86 @@ DEBUG: sent COMMIT over connection 13300009
DEBUG: sent COMMIT over connection 13300009
DEBUG: sent COMMIT over connection 13300010
DEBUG: sent COMMIT over connection 13300010
-- a very simple UNION query
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- same query with slightly different syntax, but this time we cannot push it down
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_first);
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- similar query with a filter on two of the queries
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first WHERE user_id = 15) UNION
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: cannot perform distributed planning for the given modification
DETAIL: Set operations are not allowed in INSERT ... SELECT queries
-- TODO: UUIDs
-- a test with reference table JOINs
INSERT INTO
agg_events (user_id, value_1_agg)
SELECT
raw_events_first.user_id, sum(value_1)
FROM
reference_table, raw_events_first
WHERE
raw_events_first.user_id = reference_table.user_id
GROUP BY
raw_events_first.user_id;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300000 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300001 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300003
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300002 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id
DEBUG: predicate pruning for shardId 13300000
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT raw_events_first.user_id, sum(raw_events_first.value_1) AS sum FROM public.reference_table_13300012 reference_table, public.raw_events_first_13300003 raw_events_first WHERE (raw_events_first.user_id = reference_table.user_id) GROUP BY raw_events_first.user_id
DEBUG: ProcessQuery
DEBUG: Plan is router executable
DEBUG: CommitTransactionCommand
DEBUG: CommitTransaction
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
DEBUG: sent COMMIT over connection 13300008
DEBUG: sent COMMIT over connection 13300008
DEBUG: sent COMMIT over connection 13300011
DEBUG: sent COMMIT over connection 13300011
DEBUG: sent COMMIT over connection 13300009
DEBUG: sent COMMIT over connection 13300009
DEBUG: sent COMMIT over connection 13300010
DEBUG: sent COMMIT over connection 13300010
-- unsupported JOIN
INSERT INTO agg_events
(value_4_agg,
@ -673,9 +765,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
DEBUG: predicate pruning for shardId 13300001
DEBUG: predicate pruning for shardId 13300002
DEBUG: predicate pruning for shardId 13300003
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- INSERT partition column does not match with SELECT partition column
@ -699,38 +788,84 @@ DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
-- error cases
-- no part column at all
INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first;
INSERT INTO raw_events_second
(value_1)
SELECT value_1
FROM raw_events_first;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first;
INSERT INTO raw_events_second
(value_1)
SELECT user_id
FROM raw_events_first;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT value_1
FROM raw_events_first;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id * 2
FROM raw_events_first;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id :: bigint
FROM raw_events_first;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
value_2
FROM raw_events_first
GROUP BY user_id,
value_2;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: SELECT query should return bare partition column on the same ordinal position with INSERT query's partition column
-- tables should be co-located
INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table;
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: INSERT target table and the source relation of the SELECT partition column value must be colocated

View File

@ -14,7 +14,6 @@ CREATE TABLE raw_events_second (user_id int, time timestamp, value_1 int, value_
SELECT master_create_distributed_table('raw_events_second', 'user_id', 'hash');
SELECT master_create_worker_shards('raw_events_second', 4, 2);
CREATE TABLE agg_events (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp, UNIQUE(user_id, value_1_agg));
SELECT master_create_distributed_table('agg_events', 'user_id', 'hash');
SELECT master_create_worker_shards('agg_events', 4, 2);
@ -23,6 +22,10 @@ SELECT master_create_worker_shards('agg_events', 4, 2);
UPDATE pg_dist_partition SET colocationid = 100000 WHERE logicalrelid IN ('raw_events_first', 'raw_events_second', 'agg_events');
CREATE TABLE reference_table (user_id int);
SELECT master_create_distributed_table('reference_table', 'user_id', 'hash');
SELECT master_create_worker_shards('reference_table', 1, 2);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
(1, now(), 10, 100, 1000.1, 10000);
INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES
@ -209,9 +212,45 @@ SELECT
FROM
raw_events_first GROUP BY user_id;
-- a very simple UNION query
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_second)) as foo;
-- same query with slightly different syntax, but this time we cannot push it down
INSERT INTO
raw_events_first(user_id)
(SELECT user_id FROM raw_events_first) UNION
(SELECT user_id FROM raw_events_first);
-- similar query with a filter on two of the queries
INSERT INTO
raw_events_first(user_id)
SELECT
user_id
FROM
((SELECT user_id FROM raw_events_first WHERE user_id = 15) UNION
(SELECT user_id FROM raw_events_second where user_id = 17)) as foo;
-- TODO: UUIDs
-- a test with reference table JOINs
INSERT INTO
agg_events (user_id, value_1_agg)
SELECT
raw_events_first.user_id, sum(value_1)
FROM
reference_table, raw_events_first
WHERE
raw_events_first.user_id = reference_table.user_id
GROUP BY
raw_events_first.user_id;
-- unsupported JOIN
INSERT INTO agg_events
(value_4_agg,
@ -247,10 +286,59 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
-- error cases
-- no part column at all
INSERT INTO raw_events_second (value_1) SELECT value_1 FROM raw_events_first;
INSERT INTO raw_events_second (value_1) SELECT user_id FROM raw_events_first;
INSERT INTO raw_events_second (user_id) SELECT value_1 FROM raw_events_first;
INSERT INTO raw_events_second (user_id) SELECT user_id * 2 FROM raw_events_first;
INSERT INTO raw_events_second (user_id) SELECT user_id::bigint FROM raw_events_first;
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), avg(value_2) FROM raw_events_first GROUP BY user_id;
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, value_2_agg, user_id) SELECT sum(value_3), count(value_4), user_id, sum(value_1), value_2 FROM raw_events_first GROUP BY user_id, value_2;
INSERT INTO raw_events_second
(value_1)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(value_1)
SELECT user_id
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT value_1
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id * 2
FROM raw_events_first;
INSERT INTO raw_events_second
(user_id)
SELECT user_id :: bigint
FROM raw_events_first;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
Avg(value_2)
FROM raw_events_first
GROUP BY user_id;
INSERT INTO agg_events
(value_3_agg,
value_4_agg,
value_1_agg,
value_2_agg,
user_id)
SELECT SUM(value_3),
Count(value_4),
user_id,
SUM(value_1),
value_2
FROM raw_events_first
GROUP BY user_id,
value_2;
-- tables should be co-located
INSERT INTO agg_events (user_id) SELECT user_id FROM reference_table;