diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 5a6cdd06e..5c2f04165 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -250,7 +250,7 @@ ExecuteDistributedModify(Task *task) Assert(taskPlacement->shardState == FILE_FINALIZED); - connection = GetConnection(nodeName, nodePort); + connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -383,7 +383,7 @@ ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, bool queryOK = false; bool storedOK = false; - PGconn *connection = GetConnection(nodeName, nodePort); + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { continue; diff --git a/src/backend/distributed/planner/modify_planner.c b/src/backend/distributed/planner/modify_planner.c index 15a08ea16..faeb6b6b0 100644 --- a/src/backend/distributed/planner/modify_planner.c +++ b/src/backend/distributed/planner/modify_planner.c @@ -121,16 +121,6 @@ ErrorIfQueryNotSupported(Query *queryTree) Assert(commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE); - if (!(partitionMethod == DISTRIBUTE_BY_HASH || - partitionMethod == DISTRIBUTE_BY_RANGE)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("Only hash- or range-partitioned tables may be the " - "target of distributed modifications"))); - } - /* * Reject subqueries which are in SELECT or WHERE clause. * Queries which include subqueries in FROM clauses are rejected below. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 1b4172d3a..5e497970e 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2808,6 +2808,10 @@ SimpleOpExpression(Expr *clause) return false; /* not a binary opclause */ } + /* strip coercions before doing check */ + leftOperand = strip_implicit_coercions(leftOperand); + rightOperand = strip_implicit_coercions(rightOperand); + if (IsA(rightOperand, Const) && IsA(leftOperand, Var)) { constantClause = (Const *) rightOperand; @@ -2919,6 +2923,10 @@ OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn) Node *rightOperand = get_rightop((Expr *) operatorExpression); Var *column = NULL; + /* strip coercions before doing check */ + leftOperand = strip_implicit_coercions(leftOperand); + rightOperand = strip_implicit_coercions(rightOperand); + if (IsA(leftOperand, Var)) { column = (Var *) leftOperand; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 93d05817e..e6cd82b50 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -52,19 +52,10 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (NeedsDistributedPlanning(parse)) { - MemoryContext oldcontext = NULL; - MultiPlan *physicalPlan = NULL; - - /* Switch to top level message context */ - oldcontext = MemoryContextSwitchTo(MessageContext); - - physicalPlan = CreatePhysicalPlan(parse); + MultiPlan *physicalPlan = CreatePhysicalPlan(parse); /* store required data into the planned statement */ result = MultiQueryContainerNode(result, physicalPlan); - - /* Now switch back to original context */ - MemoryContextSwitchTo(oldcontext); } return result; diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 25fcbf593..1b256695c 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -48,7 +48,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS) int32 nodePort = PG_GETARG_INT32(1); PGresult *result = NULL; - PGconn *connection = GetConnection(nodeName, nodePort); + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { PG_RETURN_BOOL(false); @@ -79,7 +79,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS) Datum count = Int32GetDatum(-1); PGresult *result = NULL; - PGconn *connection = GetConnection(nodeName, nodePort); + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { PG_RETURN_DATUM(count); @@ -114,7 +114,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS) char *nodeName = PG_GETARG_CSTRING(0); int32 nodePort = PG_GETARG_INT32(1); - PGconn *connection = GetConnection(nodeName, nodePort); + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { PG_RETURN_BOOL(false); @@ -136,7 +136,7 @@ set_connection_status_bad(PG_FUNCTION_ARGS) char *nodeName = PG_GETARG_CSTRING(0); int32 nodePort = PG_GETARG_INT32(1); - PGconn *connection = GetConnection(nodeName, nodePort); + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { PG_RETURN_BOOL(false); diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 4281a4a59..a4aa14cd7 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -32,7 +32,7 @@ /* * NodeConnectionHash is the connection hash itself. It begins uninitialized. - * The first call to GetConnection triggers hash creation. + * The first call to GetOrEstablishConnection triggers hash creation. */ static HTAB *NodeConnectionHash = NULL; @@ -44,10 +44,10 @@ static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); /* - * GetConnection returns a PGconn which can be used to execute queries on a - * remote PostgreSQL server. If no suitable connection to the specified node on - * the specified port yet exists, the function establishes a new connection and - * returns that. + * GetOrEstablishConnection returns a PGconn which can be used to execute + * queries on a remote PostgreSQL server. If no suitable connection to the + * specified node on the specified port yet exists, the function establishes + * a new connection and adds it to the connection cache before returning it. * * Returned connections are guaranteed to be in the CONNECTION_OK state. If the * requested connection cannot be established, or if it was previously created @@ -56,7 +56,7 @@ static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); * This function throws an error if a hostname over 255 characters is provided. */ PGconn * -GetConnection(char *nodeName, int32 nodePort) +GetOrEstablishConnection(char *nodeName, int32 nodePort) { PGconn *connection = NULL; NodeConnectionKey nodeConnectionKey; diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 2a1f997d6..45f61742b 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -51,7 +51,7 @@ typedef struct NodeConnectionEntry /* function declarations for obtaining and using a connection */ -extern PGconn * GetConnection(char *nodeName, int32 nodePort); +extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void ReportRemoteError(PGconn *connection, PGresult *result); diff --git a/src/test/regress/expected/multi_data_types.out b/src/test/regress/expected/multi_data_types.out index 8dc78f841..18b7a54dc 100644 --- a/src/test/regress/expected/multi_data_types.out +++ b/src/test/regress/expected/multi_data_types.out @@ -121,7 +121,7 @@ CREATE TABLE varchar_hash_partitioned_table id int, name varchar ); -SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'id', 'hash'); +SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'name', 'hash'); master_create_distributed_table --------------------------------- @@ -139,16 +139,16 @@ INSERT INTO varchar_hash_partitioned_table VALUES (2, 'Ozgun'); INSERT INTO varchar_hash_partitioned_table VALUES (3, 'Onder'); INSERT INTO varchar_hash_partitioned_table VALUES (4, 'Sumedh'); INSERT INTO varchar_hash_partitioned_table VALUES (5, 'Marco'); -SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Onder'; +SELECT * FROM varchar_hash_partitioned_table WHERE id = 1; id | name ----+------- - 3 | Onder + 1 | Jason (1 row) -UPDATE varchar_hash_partitioned_table SET name = 'Samay' WHERE id = 5; -SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Samay'; +UPDATE varchar_hash_partitioned_table SET id = 6 WHERE name = 'Jason'; +SELECT * FROM varchar_hash_partitioned_table WHERE id = 6; id | name ----+------- - 5 | Samay + 6 | Jason (1 row) diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 5da80026b..d48c47551 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -61,10 +61,14 @@ SELECT master_create_empty_shard('range_partitioned') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999 WHERE shardid = :new_shard_id; --- create append-partitioned shard +-- create append-partitioned shards SELECT master_create_empty_shard('append_partitioned') AS new_shard_id \gset -UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 100000 +UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('append_partitioned') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 WHERE shardid = :new_shard_id; -- basic single-row INSERT INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', @@ -82,7 +86,10 @@ ERROR: distributed modifications must target exactly one shard -- try an insert to a range-partitioned table INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); --- ensure the value is where we put it and query to find it is properly pruned +-- also insert to an append-partitioned table +INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', + 20.69); +-- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; SET citusdb.task_executor_type TO 'router'; SELECT * FROM range_partitioned WHERE id = 32743; @@ -92,17 +99,23 @@ DEBUG: predicate pruning for shardId 103070 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 (1 row) +SELECT * FROM append_partitioned WHERE id = 414123; +DEBUG: predicate pruning for shardId 103072 + id | symbol | bidder_id | placed_at | kind | limit_price +--------+--------+-----------+--------------------------+------+------------- + 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 +(1 row) + SET client_min_messages TO DEFAULT; SET citusdb.task_executor_type TO DEFAULT; --- also try inserting without a range-partitioned shard to receive the value +-- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); ERROR: distributed modifications must target exactly one shard --- try an insert to an append-partitioned table -INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', +-- and insert into an append-partitioned table with a value that spans shards: +INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Only hash- or range-partitioned tables may be the target of distributed modifications +ERROR: distributed modifications must target exactly one shard -- INSERT with DEFAULT in the target list INSERT INTO limit_orders VALUES (12756, 'MSFT', 10959, '2013-05-08 07:29:23', 'sell', DEFAULT); diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 4407ec697..734dfbde2 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -309,7 +309,7 @@ SET client_min_messages TO 'DEBUG2'; SELECT * FROM articles WHERE author_id = 1; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -323,7 +323,7 @@ DEBUG: predicate pruning for shardId 103093 SELECT * FROM articles WHERE author_id = 1 OR author_id = 17; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -343,7 +343,7 @@ HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". SELECT id as article_id, word_count * id as random_value FROM articles WHERE author_id = 1; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 article_id | random_value ------------+-------------- 1 | 9572 @@ -360,7 +360,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; DEBUG: push down of limit count: 3 -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] first_author | second_word_count --------------+------------------- @@ -375,7 +375,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; DEBUG: push down of limit count: 3 -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 ERROR: cannot use router executor with JOINs HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- do not create the master query for LIMIT on a single shard SELECT @@ -384,7 +384,7 @@ SELECT * WHERE author_id = 1 LIMIT 2; DEBUG: push down of limit count: 2 -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -398,7 +398,7 @@ SELECT id FROM articles WHERE author_id = 1 GROUP BY id; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 id ---- 41 @@ -415,7 +415,7 @@ COPY articles_single_shard TO stdout; SELECT avg(word_count) FROM articles WHERE author_id = 2; -DEBUG: predicate pruning for shardId 103092 +DEBUG: predicate pruning for shardId 103093 ERROR: cannot use router executor with aggregates HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- max, min, sum, count is somehow implemented @@ -424,7 +424,7 @@ SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; -DEBUG: predicate pruning for shardId 103092 +DEBUG: predicate pruning for shardId 103093 ERROR: cannot use router executor with aggregates HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- error out for queries with ORDER BY @@ -432,7 +432,7 @@ SELECT * FROM articles WHERE author_id = 1 ORDER BY word_count; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 ERROR: cannot use router executor with ORDER BY clauses HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- error out for queries with ORDER BY and LIMIT @@ -442,7 +442,7 @@ SELECT * ORDER BY word_count LIMIT 2; DEBUG: push down of limit count: 2 -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 ERROR: cannot use router executor with ORDER BY clauses HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- error out for queries with aggregates and GROUP BY @@ -450,14 +450,14 @@ SELECT max(word_count) FROM articles WHERE author_id = 1 GROUP BY author_id; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 ERROR: cannot use router executor with aggregates HINT: Set citusdb.task_executor_type to "real-time" or "task-tracker". -- error out for queries with repartition jobs SELECT * FROM articles a, articles b WHERE a.id = b.id AND a.author_id = 1; -DEBUG: predicate pruning for shardId 103093 +DEBUG: predicate pruning for shardId 103094 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/sql/multi_data_types.sql b/src/test/regress/sql/multi_data_types.sql index 99c603e63..68b84044f 100644 --- a/src/test/regress/sql/multi_data_types.sql +++ b/src/test/regress/sql/multi_data_types.sql @@ -104,7 +104,7 @@ CREATE TABLE varchar_hash_partitioned_table name varchar ); -SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'id', 'hash'); +SELECT master_create_distributed_table('varchar_hash_partitioned_table', 'name', 'hash'); SELECT master_create_worker_shards('varchar_hash_partitioned_table', 4, 1); -- execute INSERT, SELECT and UPDATE queries on composite_type_partitioned_table @@ -114,8 +114,8 @@ INSERT INTO varchar_hash_partitioned_table VALUES (3, 'Onder'); INSERT INTO varchar_hash_partitioned_table VALUES (4, 'Sumedh'); INSERT INTO varchar_hash_partitioned_table VALUES (5, 'Marco'); -SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Onder'; +SELECT * FROM varchar_hash_partitioned_table WHERE id = 1; -UPDATE varchar_hash_partitioned_table SET name = 'Samay' WHERE id = 5; +UPDATE varchar_hash_partitioned_table SET id = 6 WHERE name = 'Jason'; -SELECT * FROM varchar_hash_partitioned_table WHERE name = 'Samay'; +SELECT * FROM varchar_hash_partitioned_table WHERE id = 6; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 040411079..390913dfb 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -40,10 +40,15 @@ SELECT master_create_empty_shard('range_partitioned') AS new_shard_id UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999 WHERE shardid = :new_shard_id; --- create append-partitioned shard +-- create append-partitioned shards SELECT master_create_empty_shard('append_partitioned') AS new_shard_id \gset -UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 100000 +UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('append_partitioned') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 WHERE shardid = :new_shard_id; -- basic single-row INSERT @@ -59,19 +64,23 @@ INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:5 INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); --- ensure the value is where we put it and query to find it is properly pruned +-- also insert to an append-partitioned table +INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', + 20.69); +-- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; SET citusdb.task_executor_type TO 'router'; SELECT * FROM range_partitioned WHERE id = 32743; +SELECT * FROM append_partitioned WHERE id = 414123; SET client_min_messages TO DEFAULT; SET citusdb.task_executor_type TO DEFAULT; --- also try inserting without a range-partitioned shard to receive the value +-- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); --- try an insert to an append-partitioned table -INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', +-- and insert into an append-partitioned table with a value that spans shards: +INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -- INSERT with DEFAULT in the target list