pull/341/merge
Murat Tuncer 2016-02-16 10:25:04 +00:00
commit 7d0e99fe1a
12 changed files with 81 additions and 70 deletions

View File

@ -250,7 +250,7 @@ ExecuteDistributedModify(Task *task)
Assert(taskPlacement->shardState == FILE_FINALIZED);
connection = GetConnection(nodeName, nodePort);
connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
failedPlacementList = lappend(failedPlacementList, taskPlacement);
@ -383,7 +383,7 @@ ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor,
bool queryOK = false;
bool storedOK = false;
PGconn *connection = GetConnection(nodeName, nodePort);
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
continue;

View File

@ -121,16 +121,6 @@ ErrorIfQueryNotSupported(Query *queryTree)
Assert(commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE);
if (!(partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given"
" modification"),
errdetail("Only hash- or range-partitioned tables may be the "
"target of distributed modifications")));
}
/*
* Reject subqueries which are in SELECT or WHERE clause.
* Queries which include subqueries in FROM clauses are rejected below.

View File

@ -2808,6 +2808,10 @@ SimpleOpExpression(Expr *clause)
return false; /* not a binary opclause */
}
/* strip coercions before doing check */
leftOperand = strip_implicit_coercions(leftOperand);
rightOperand = strip_implicit_coercions(rightOperand);
if (IsA(rightOperand, Const) && IsA(leftOperand, Var))
{
constantClause = (Const *) rightOperand;
@ -2919,6 +2923,10 @@ OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn)
Node *rightOperand = get_rightop((Expr *) operatorExpression);
Var *column = NULL;
/* strip coercions before doing check */
leftOperand = strip_implicit_coercions(leftOperand);
rightOperand = strip_implicit_coercions(rightOperand);
if (IsA(leftOperand, Var))
{
column = (Var *) leftOperand;

View File

@ -52,19 +52,10 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (NeedsDistributedPlanning(parse))
{
MemoryContext oldcontext = NULL;
MultiPlan *physicalPlan = NULL;
/* Switch to top level message context */
oldcontext = MemoryContextSwitchTo(MessageContext);
physicalPlan = CreatePhysicalPlan(parse);
MultiPlan *physicalPlan = CreatePhysicalPlan(parse);
/* store required data into the planned statement */
result = MultiQueryContainerNode(result, physicalPlan);
/* Now switch back to original context */
MemoryContextSwitchTo(oldcontext);
}
return result;

View File

@ -48,7 +48,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS)
int32 nodePort = PG_GETARG_INT32(1);
PGresult *result = NULL;
PGconn *connection = GetConnection(nodeName, nodePort);
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
PG_RETURN_BOOL(false);
@ -79,7 +79,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS)
Datum count = Int32GetDatum(-1);
PGresult *result = NULL;
PGconn *connection = GetConnection(nodeName, nodePort);
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
PG_RETURN_DATUM(count);
@ -114,7 +114,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS)
char *nodeName = PG_GETARG_CSTRING(0);
int32 nodePort = PG_GETARG_INT32(1);
PGconn *connection = GetConnection(nodeName, nodePort);
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
PG_RETURN_BOOL(false);
@ -136,7 +136,7 @@ set_connection_status_bad(PG_FUNCTION_ARGS)
char *nodeName = PG_GETARG_CSTRING(0);
int32 nodePort = PG_GETARG_INT32(1);
PGconn *connection = GetConnection(nodeName, nodePort);
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
PG_RETURN_BOOL(false);

View File

@ -32,7 +32,7 @@
/*
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
* The first call to GetConnection triggers hash creation.
* The first call to GetOrEstablishConnection triggers hash creation.
*/
static HTAB *NodeConnectionHash = NULL;
@ -44,10 +44,10 @@ static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
/*
* GetConnection returns a PGconn which can be used to execute queries on a
* remote PostgreSQL server. If no suitable connection to the specified node on
* the specified port yet exists, the function establishes a new connection and
* returns that.
* GetOrEstablishConnection returns a PGconn which can be used to execute
* queries on a remote PostgreSQL server. If no suitable connection to the
* specified node on the specified port yet exists, the function establishes
* a new connection and adds it to the connection cache before returning it.
*
* Returned connections are guaranteed to be in the CONNECTION_OK state. If the
* requested connection cannot be established, or if it was previously created
@ -56,7 +56,7 @@ static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
* This function throws an error if a hostname over 255 characters is provided.
*/
PGconn *
GetConnection(char *nodeName, int32 nodePort)
GetOrEstablishConnection(char *nodeName, int32 nodePort)
{
PGconn *connection = NULL;
NodeConnectionKey nodeConnectionKey;

View File

@ -51,7 +51,7 @@ typedef struct NodeConnectionEntry
/* function declarations for obtaining and using a connection */
extern PGconn * GetConnection(char *nodeName, int32 nodePort);
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern void ReportRemoteError(PGconn *connection, PGresult *result);

View File

@ -121,7 +121,7 @@ CREATE TABLE varchar_hash_partitioned_table
id int,
name varchar
);
SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'id', 'hash');
SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'name', 'hash');
master_create_distributed_table
---------------------------------
@ -139,16 +139,16 @@ INSERT INTO varchar_hash_partitioned_table VALUES (2, 'Ozgun');
INSERT INTO varchar_hash_partitioned_table VALUES (3, 'Onder');
INSERT INTO varchar_hash_partitioned_table VALUES (4, 'Sumedh');
INSERT INTO varchar_hash_partitioned_table VALUES (5, 'Marco');
SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Onder';
SELECT * FROM varchar_hash_partitioned_table WHERE id = 1;
id | name
----+-------
3 | Onder
1 | Jason
(1 row)
UPDATE varchar_hash_partitioned_table SET name = 'Samay' WHERE id = 5;
SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Samay';
UPDATE varchar_hash_partitioned_table SET id = 6 WHERE name = 'Jason';
SELECT * FROM varchar_hash_partitioned_table WHERE id = 6;
id | name
----+-------
5 | Samay
6 | Jason
(1 row)

View File

@ -61,10 +61,14 @@ SELECT master_create_empty_shard('range_partitioned') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999
WHERE shardid = :new_shard_id;
-- create append-partitioned shard
-- create append-partitioned shards
SELECT master_create_empty_shard('append_partitioned') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 100000
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('append_partitioned') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
WHERE shardid = :new_shard_id;
-- basic single-row INSERT
INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
@ -82,7 +86,10 @@ ERROR: distributed modifications must target exactly one shard
-- try an insert to a range-partitioned table
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- ensure the value is where we put it and query to find it is properly pruned
-- also insert to an append-partitioned table
INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2';
SET citusdb.task_executor_type TO 'router';
SELECT * FROM range_partitioned WHERE id = 32743;
@ -92,17 +99,23 @@ DEBUG: predicate pruning for shardId 103070
32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69
(1 row)
SELECT * FROM append_partitioned WHERE id = 414123;
DEBUG: predicate pruning for shardId 103072
id | symbol | bidder_id | placed_at | kind | limit_price
--------+--------+-----------+--------------------------+------+-------------
414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69
(1 row)
SET client_min_messages TO DEFAULT;
SET citusdb.task_executor_type TO DEFAULT;
-- also try inserting without a range-partitioned shard to receive the value
-- try inserting without a range-partitioned shard to receive the value
INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
ERROR: distributed modifications must target exactly one shard
-- try an insert to an append-partitioned table
INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
-- and insert into an append-partitioned table with a value that spans shards:
INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
ERROR: cannot perform distributed planning for the given modification
DETAIL: Only hash- or range-partitioned tables may be the target of distributed modifications
ERROR: distributed modifications must target exactly one shard
-- INSERT with DEFAULT in the target list
INSERT INTO limit_orders VALUES (12756, 'MSFT', 10959, '2013-05-08 07:29:23', 'sell',
DEFAULT);

View File

@ -309,7 +309,7 @@ SET client_min_messages TO 'DEBUG2';
SELECT *
FROM articles
WHERE author_id = 1;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
id | author_id | title | word_count
----+-----------+--------------+------------
1 | 1 | arsenous | 9572
@ -323,7 +323,7 @@ DEBUG: predicate pruning for shardId 103093
SELECT *
FROM articles
WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
id | author_id | title | word_count
----+-----------+--------------+------------
1 | 1 | arsenous | 9572
@ -343,7 +343,7 @@ HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
SELECT id as article_id, word_count * id as random_value
FROM articles
WHERE author_id = 1;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
article_id | random_value
------------+--------------
1 | 9572
@ -360,7 +360,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3;
DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
first_author | second_word_count
--------------+-------------------
@ -375,7 +375,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3;
DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with JOINs
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- do not create the master query for LIMIT on a single shard SELECT
@ -384,7 +384,7 @@ SELECT *
WHERE author_id = 1
LIMIT 2;
DEBUG: push down of limit count: 2
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
id | author_id | title | word_count
----+-----------+----------+------------
1 | 1 | arsenous | 9572
@ -398,7 +398,7 @@ SELECT id
FROM articles
WHERE author_id = 1
GROUP BY id;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
id
----
41
@ -415,7 +415,7 @@ COPY articles_single_shard TO stdout;
SELECT avg(word_count)
FROM articles
WHERE author_id = 2;
DEBUG: predicate pruning for shardId 103092
DEBUG: predicate pruning for shardId 103093
ERROR: cannot use router executor with aggregates
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- max, min, sum, count is somehow implemented
@ -424,7 +424,7 @@ SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt
FROM articles
WHERE author_id = 2;
DEBUG: predicate pruning for shardId 103092
DEBUG: predicate pruning for shardId 103093
ERROR: cannot use router executor with aggregates
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with ORDER BY
@ -432,7 +432,7 @@ SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with ORDER BY clauses
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with ORDER BY and LIMIT
@ -442,7 +442,7 @@ SELECT *
ORDER BY word_count
LIMIT 2;
DEBUG: push down of limit count: 2
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with ORDER BY clauses
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with aggregates and GROUP BY
@ -450,14 +450,14 @@ SELECT max(word_count)
FROM articles
WHERE author_id = 1
GROUP BY author_id;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with aggregates
HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with repartition jobs
SELECT *
FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1;
DEBUG: predicate pruning for shardId 103093
DEBUG: predicate pruning for shardId 103094
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3

View File

@ -104,7 +104,7 @@ CREATE TABLE varchar_hash_partitioned_table
name varchar
);
SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'id', 'hash');
SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'name', 'hash');
SELECT master_create_worker_shards('varchar_hash_partitioned_table', 4, 1);
-- execute INSERT, SELECT and UPDATE queries on composite_type_partitioned_table
@ -114,8 +114,8 @@ INSERT INTO varchar_hash_partitioned_table VALUES (3, 'Onder');
INSERT INTO varchar_hash_partitioned_table VALUES (4, 'Sumedh');
INSERT INTO varchar_hash_partitioned_table VALUES (5, 'Marco');
SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Onder';
SELECT * FROM varchar_hash_partitioned_table WHERE id = 1;
UPDATE varchar_hash_partitioned_table SET name = 'Samay' WHERE id = 5;
UPDATE varchar_hash_partitioned_table SET id = 6 WHERE name = 'Jason';
SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Samay';
SELECT * FROM varchar_hash_partitioned_table WHERE id = 6;

View File

@ -40,10 +40,15 @@ SELECT master_create_empty_shard('range_partitioned') AS new_shard_id
UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999
WHERE shardid = :new_shard_id;
-- create append-partitioned shard
-- create append-partitioned shards
SELECT master_create_empty_shard('append_partitioned') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 100000
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('append_partitioned') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
WHERE shardid = :new_shard_id;
-- basic single-row INSERT
@ -59,19 +64,23 @@ INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:5
INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- ensure the value is where we put it and query to find it is properly pruned
-- also insert to an append-partitioned table
INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2';
SET citusdb.task_executor_type TO 'router';
SELECT * FROM range_partitioned WHERE id = 32743;
SELECT * FROM append_partitioned WHERE id = 414123;
SET client_min_messages TO DEFAULT;
SET citusdb.task_executor_type TO DEFAULT;
-- also try inserting without a range-partitioned shard to receive the value
-- try inserting without a range-partitioned shard to receive the value
INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- try an insert to an append-partitioned table
INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
-- and insert into an append-partitioned table with a value that spans shards:
INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
-- INSERT with DEFAULT in the target list