diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 961345a12..491c4e6eb 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -19,6 +19,8 @@ #include "miscadmin.h" #include "access/xact.h" +#include "access/transam.h" +#include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" @@ -33,6 +35,7 @@ #include "utils/builtins.h" #include "utils/elog.h" #include "utils/errcodes.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/palloc.h" #include "utils/int8.h" @@ -55,7 +58,11 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip DestReceiver *destination, Tuplestorestate *tupleStore); static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString); -static bool SendQueryInSingleRowMode(PGconn *connection, char *query); +static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues); +static bool SendQueryInSingleRowMode(PGconn *connection, char *query, + ParamListInfo paramListInfo); static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, TupleDesc tupleDescriptor, int64 *rows); static bool ConsumeQueryResult(PGconn *connection, int64 *rows); @@ -316,6 +323,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, TupleDesc tupleDescriptor = queryDesc->tupDesc; EState *executorState = queryDesc->estate; MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; @@ -359,7 +367,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, continue; } - queryOK = SendQueryInSingleRowMode(connection, queryString); + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { PurgeConnection(connection); @@ -515,12 +523,28 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, * connection so that we receive results a row at a time. */ static bool -SendQueryInSingleRowMode(PGconn *connection, char *query) +SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) { int querySent = 0; int singleRowMode = 0; - querySent = PQsendQuery(connection, query); + if (paramListInfo != NULL) + { + int parameterCount = paramListInfo->numParams; + Oid *parameterTypes = NULL; + const char **parameterValues = NULL; + + ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, + ¶meterValues); + + querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, + parameterValues, NULL, NULL, 0); + } + else + { + querySent = PQsendQuery(connection, query); + } + if (querySent == 0) { WarnRemoteError(connection, NULL); @@ -538,6 +562,55 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) } +/* + * ExtractParametersFromParamListInfo extracts parameter types and values from + * the given ParamListInfo structure, and fills parameter type and value arrays. + */ +static void +ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, + const char ***parameterValues) +{ + int parameterIndex = 0; + int parameterCount = paramListInfo->numParams; + + *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); + *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); + + /* get parameter types and values */ + for (parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++) + { + ParamExternData *parameterData = ¶mListInfo->params[parameterIndex]; + Oid typeOutputFunctionId = InvalidOid; + bool variableLengthType = false; + + /* + * Use 0 for data types where the oid values can be different on + * the master and worker nodes. Therefore, the worker nodes can + * infer the correct oid. + */ + if (parameterData->ptype >= FirstNormalObjectId) + { + (*parameterTypes)[parameterIndex] = 0; + } + else + { + (*parameterTypes)[parameterIndex] = parameterData->ptype; + } + + if (parameterData->isnull) + { + (*parameterValues)[parameterIndex] = NULL; + continue; + } + + getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, + &variableLengthType); + (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, + parameterData->value); + } +} + + /* * StoreQueryResult gets the query results from the given connection, builds * tuples from the results, and stores them in the a newly created diff --git a/src/test/regress/expected/multi_create_insert_proxy.out b/src/test/regress/expected/multi_create_insert_proxy.out index 9cc5ece73..a18205995 100644 --- a/src/test/regress/expected/multi_create_insert_proxy.out +++ b/src/test/regress/expected/multi_create_insert_proxy.out @@ -64,23 +64,29 @@ CREATE TEMPORARY SEQUENCE rows_inserted; SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename \gset -- insert to proxy, again relying on default value --- will be fixed with another PR --- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); +INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); -- test copy with bad row in middle --- will be fixed with another PR \set VERBOSITY terse COPY pg_temp.:"proxy_tablename" FROM stdin; -ERROR: could not modify any active placements +ERROR: null value in column "data" violates not-null constraint \set VERBOSITY default -- verify rows were copied to distributed table --- will be fixed with another PR SELECT * FROM insert_target ORDER BY id ASC; - id | data -----+------ -(0 rows) + id | data +----+----------------------------- + 1 | lorem ipsum + 2 | dolor sit amet + 3 | consectetur adipiscing elit + 4 | sed do eiusmod + 5 | tempor incididunt ut + 6 | labore et dolore +(6 rows) -- the counter should match the number of rows stored --- will be fixed with another PR SELECT currval('rows_inserted'); -ERROR: currval of sequence "rows_inserted" is not yet defined in this session + currval +--------- + 6 +(1 row) + SET client_min_messages TO DEFAULT; diff --git a/src/test/regress/expected/multi_prepare_sql.out b/src/test/regress/expected/multi_prepare_sql.out index 3a54596a6..ae04d6ee0 100644 --- a/src/test/regress/expected/multi_prepare_sql.out +++ b/src/test/regress/expected/multi_prepare_sql.out @@ -242,5 +242,101 @@ EXECUTE prepared_test_6(155); -- FIXME: temporarily disabled -- EXECUTE prepared_test_6(1555); +-- test router executor with parameterized non-partition columns +-- create a custom type which also exists on worker nodes +CREATE TYPE test_composite_type AS ( + i integer, + i2 integer +); +CREATE TABLE router_executor_table ( + id bigint NOT NULL, + comment varchar(20), + stats test_composite_type +); +SELECT master_create_distributed_table('router_executor_table', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('router_executor_table', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- test parameterized inserts +PREPARE prepared_insert(varchar(20)) AS + INSERT INTO router_executor_table VALUES (1, $1, $2); +EXECUTE prepared_insert('comment-1', '(1, 10)'); +EXECUTE prepared_insert('comment-2', '(2, 20)'); +EXECUTE prepared_insert('comment-3', '(3, 30)'); +EXECUTE prepared_insert('comment-4', '(4, 40)'); +EXECUTE prepared_insert('comment-5', '(5, 50)'); +EXECUTE prepared_insert('comment-6', '(6, 60)'); +SELECT * FROM router_executor_table ORDER BY comment; + id | comment | stats +----+-----------+-------- + 1 | comment-1 | (1,10) + 1 | comment-2 | (2,20) + 1 | comment-3 | (3,30) + 1 | comment-4 | (4,40) + 1 | comment-5 | (5,50) + 1 | comment-6 | (6,60) +(6 rows) + +-- test parameterized selects +PREPARE prepared_select(integer, integer) AS + SELECT count(*) FROM router_executor_table + WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type; +EXECUTE prepared_select(1, 10); + count +------- + 1 +(1 row) + +EXECUTE prepared_select(2, 20); + count +------- + 1 +(1 row) + +EXECUTE prepared_select(3, 30); + count +------- + 1 +(1 row) + +EXECUTE prepared_select(4, 40); + count +------- + 1 +(1 row) + +EXECUTE prepared_select(5, 50); + count +------- + 1 +(1 row) + +EXECUTE prepared_select(6, 60); + count +------- + 1 +(1 row) + +-- test that we don't crash on failing parameterized insert on the partition column +PREPARE prepared_partition_column_insert(bigint) AS +INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); +-- we error out on the 6th execution +EXECUTE prepared_partition_column_insert(1); +EXECUTE prepared_partition_column_insert(2); +EXECUTE prepared_partition_column_insert(3); +EXECUTE prepared_partition_column_insert(4); +EXECUTE prepared_partition_column_insert(5); +EXECUTE prepared_partition_column_insert(6); +ERROR: values given for the partition column must be constants or constant expressions +DROP TYPE test_composite_type CASCADE; +NOTICE: drop cascades to table router_executor_table column stats -- clean-up prepared statements DEALLOCATE ALL; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index d2b252b93..3001c23b7 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1041,7 +1041,6 @@ DEBUG: Plan is router executable (5 rows) -- parametric prepare queries can be router plannable --- it will be fixed with another pr PREPARE author_articles(int) as SELECT * FROM articles_hash @@ -1050,9 +1049,15 @@ EXECUTE author_articles(1); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 DEBUG: Plan is router executable -WARNING: there is no parameter $1 -CONTEXT: while executing command on localhost:57637 -ERROR: could not receive query results + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + -- queries inside plpgsql functions could be router plannable CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$ DECLARE diff --git a/src/test/regress/sql/multi_create_insert_proxy.sql b/src/test/regress/sql/multi_create_insert_proxy.sql index ed977f8ad..ee8de14d8 100644 --- a/src/test/regress/sql/multi_create_insert_proxy.sql +++ b/src/test/regress/sql/multi_create_insert_proxy.sql @@ -60,11 +60,9 @@ SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_ \gset -- insert to proxy, again relying on default value --- will be fixed with another PR --- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); +INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); -- test copy with bad row in middle --- will be fixed with another PR \set VERBOSITY terse COPY pg_temp.:"proxy_tablename" FROM stdin; 2 dolor sit amet @@ -78,11 +76,9 @@ COPY pg_temp.:"proxy_tablename" FROM stdin; \set VERBOSITY default -- verify rows were copied to distributed table --- will be fixed with another PR SELECT * FROM insert_target ORDER BY id ASC; -- the counter should match the number of rows stored --- will be fixed with another PR SELECT currval('rows_inserted'); SET client_min_messages TO DEFAULT; diff --git a/src/test/regress/sql/multi_prepare_sql.sql b/src/test/regress/sql/multi_prepare_sql.sql index 18417729c..1fd889c89 100644 --- a/src/test/regress/sql/multi_prepare_sql.sql +++ b/src/test/regress/sql/multi_prepare_sql.sql @@ -152,5 +152,62 @@ EXECUTE prepared_test_6(155); -- FIXME: temporarily disabled -- EXECUTE prepared_test_6(1555); +-- test router executor with parameterized non-partition columns + +-- create a custom type which also exists on worker nodes +CREATE TYPE test_composite_type AS ( + i integer, + i2 integer +); + +CREATE TABLE router_executor_table ( + id bigint NOT NULL, + comment varchar(20), + stats test_composite_type +); + +SELECT master_create_distributed_table('router_executor_table', 'id', 'hash'); +SELECT master_create_worker_shards('router_executor_table', 2, 2); + +-- test parameterized inserts +PREPARE prepared_insert(varchar(20)) AS + INSERT INTO router_executor_table VALUES (1, $1, $2); + +EXECUTE prepared_insert('comment-1', '(1, 10)'); +EXECUTE prepared_insert('comment-2', '(2, 20)'); +EXECUTE prepared_insert('comment-3', '(3, 30)'); +EXECUTE prepared_insert('comment-4', '(4, 40)'); +EXECUTE prepared_insert('comment-5', '(5, 50)'); +EXECUTE prepared_insert('comment-6', '(6, 60)'); + +SELECT * FROM router_executor_table ORDER BY comment; + +-- test parameterized selects +PREPARE prepared_select(integer, integer) AS + SELECT count(*) FROM router_executor_table + WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type; + +EXECUTE prepared_select(1, 10); +EXECUTE prepared_select(2, 20); +EXECUTE prepared_select(3, 30); +EXECUTE prepared_select(4, 40); +EXECUTE prepared_select(5, 50); +EXECUTE prepared_select(6, 60); + +-- test that we don't crash on failing parameterized insert on the partition column + +PREPARE prepared_partition_column_insert(bigint) AS +INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)'); + +-- we error out on the 6th execution +EXECUTE prepared_partition_column_insert(1); +EXECUTE prepared_partition_column_insert(2); +EXECUTE prepared_partition_column_insert(3); +EXECUTE prepared_partition_column_insert(4); +EXECUTE prepared_partition_column_insert(5); +EXECUTE prepared_partition_column_insert(6); + +DROP TYPE test_composite_type CASCADE; + -- clean-up prepared statements DEALLOCATE ALL; diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 221cb7ef3..8812e4520 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -475,7 +475,6 @@ PREPARE author_1_articles as EXECUTE author_1_articles; -- parametric prepare queries can be router plannable --- it will be fixed with another pr PREPARE author_articles(int) as SELECT * FROM articles_hash