Add support for prepared statements with parameterized non-partition columns in router executor

pull/649/head
Metin Doslu 2016-07-13 11:07:42 +03:00
parent 082b6b9416
commit a811e09dd4
7 changed files with 256 additions and 24 deletions

View File

@ -19,6 +19,8 @@
#include "miscadmin.h"
#include "access/xact.h"
#include "access/transam.h"
#include "catalog/pg_type.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/connection_cache.h"
@ -33,6 +35,7 @@
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/int8.h"
@ -55,7 +58,11 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
DestReceiver *destination,
Tuplestorestate *tupleStore);
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
ParamListInfo paramListInfo);
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows);
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
@ -316,6 +323,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
TupleDesc tupleDescriptor = queryDesc->tupDesc;
EState *executorState = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
ParamListInfo paramListInfo = queryDesc->params;
bool resultsOK = false;
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
@ -359,7 +367,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
continue;
}
queryOK = SendQueryInSingleRowMode(connection, queryString);
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
PurgeConnection(connection);
@ -515,12 +523,28 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
* connection so that we receive results a row at a time.
*/
static bool
SendQueryInSingleRowMode(PGconn *connection, char *query)
SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo)
{
int querySent = 0;
int singleRowMode = 0;
if (paramListInfo != NULL)
{
int parameterCount = paramListInfo->numParams;
Oid *parameterTypes = NULL;
const char **parameterValues = NULL;
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
&parameterValues);
querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes,
parameterValues, NULL, NULL, 0);
}
else
{
querySent = PQsendQuery(connection, query);
}
if (querySent == 0)
{
WarnRemoteError(connection, NULL);
@ -538,6 +562,55 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
}
/*
* ExtractParametersFromParamListInfo extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
*/
static void
ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{
int parameterIndex = 0;
int parameterCount = paramListInfo->numParams;
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
/* get parameter types and values */
for (parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++)
{
ParamExternData *parameterData = &paramListInfo->params[parameterIndex];
Oid typeOutputFunctionId = InvalidOid;
bool variableLengthType = false;
/*
* Use 0 for data types where the oid values can be different on
* the master and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
*/
if (parameterData->ptype >= FirstNormalObjectId)
{
(*parameterTypes)[parameterIndex] = 0;
}
else
{
(*parameterTypes)[parameterIndex] = parameterData->ptype;
}
if (parameterData->isnull)
{
(*parameterValues)[parameterIndex] = NULL;
continue;
}
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
&variableLengthType);
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
parameterData->value);
}
}
/*
* StoreQueryResult gets the query results from the given connection, builds
* tuples from the results, and stores them in the a newly created

View File

@ -64,23 +64,29 @@ CREATE TEMPORARY SEQUENCE rows_inserted;
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
\gset
-- insert to proxy, again relying on default value
-- will be fixed with another PR
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- test copy with bad row in middle
-- will be fixed with another PR
\set VERBOSITY terse
COPY pg_temp.:"proxy_tablename" FROM stdin;
ERROR: could not modify any active placements
ERROR: null value in column "data" violates not-null constraint
\set VERBOSITY default
-- verify rows were copied to distributed table
-- will be fixed with another PR
SELECT * FROM insert_target ORDER BY id ASC;
id | data
----+------
(0 rows)
----+-----------------------------
1 | lorem ipsum
2 | dolor sit amet
3 | consectetur adipiscing elit
4 | sed do eiusmod
5 | tempor incididunt ut
6 | labore et dolore
(6 rows)
-- the counter should match the number of rows stored
-- will be fixed with another PR
SELECT currval('rows_inserted');
ERROR: currval of sequence "rows_inserted" is not yet defined in this session
currval
---------
6
(1 row)
SET client_min_messages TO DEFAULT;

View File

@ -242,5 +242,101 @@ EXECUTE prepared_test_6(155);
-- FIXME: temporarily disabled
-- EXECUTE prepared_test_6(1555);
-- test router executor with parameterized non-partition columns
-- create a custom type which also exists on worker nodes
CREATE TYPE test_composite_type AS (
i integer,
i2 integer
);
CREATE TABLE router_executor_table (
id bigint NOT NULL,
comment varchar(20),
stats test_composite_type
);
SELECT master_create_distributed_table('router_executor_table', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('router_executor_table', 2, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- test parameterized inserts
PREPARE prepared_insert(varchar(20)) AS
INSERT INTO router_executor_table VALUES (1, $1, $2);
EXECUTE prepared_insert('comment-1', '(1, 10)');
EXECUTE prepared_insert('comment-2', '(2, 20)');
EXECUTE prepared_insert('comment-3', '(3, 30)');
EXECUTE prepared_insert('comment-4', '(4, 40)');
EXECUTE prepared_insert('comment-5', '(5, 50)');
EXECUTE prepared_insert('comment-6', '(6, 60)');
SELECT * FROM router_executor_table ORDER BY comment;
id | comment | stats
----+-----------+--------
1 | comment-1 | (1,10)
1 | comment-2 | (2,20)
1 | comment-3 | (3,30)
1 | comment-4 | (4,40)
1 | comment-5 | (5,50)
1 | comment-6 | (6,60)
(6 rows)
-- test parameterized selects
PREPARE prepared_select(integer, integer) AS
SELECT count(*) FROM router_executor_table
WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type;
EXECUTE prepared_select(1, 10);
count
-------
1
(1 row)
EXECUTE prepared_select(2, 20);
count
-------
1
(1 row)
EXECUTE prepared_select(3, 30);
count
-------
1
(1 row)
EXECUTE prepared_select(4, 40);
count
-------
1
(1 row)
EXECUTE prepared_select(5, 50);
count
-------
1
(1 row)
EXECUTE prepared_select(6, 60);
count
-------
1
(1 row)
-- test that we don't crash on failing parameterized insert on the partition column
PREPARE prepared_partition_column_insert(bigint) AS
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
-- we error out on the 6th execution
EXECUTE prepared_partition_column_insert(1);
EXECUTE prepared_partition_column_insert(2);
EXECUTE prepared_partition_column_insert(3);
EXECUTE prepared_partition_column_insert(4);
EXECUTE prepared_partition_column_insert(5);
EXECUTE prepared_partition_column_insert(6);
ERROR: values given for the partition column must be constants or constant expressions
DROP TYPE test_composite_type CASCADE;
NOTICE: drop cascades to table router_executor_table column stats
-- clean-up prepared statements
DEALLOCATE ALL;

View File

@ -1041,7 +1041,6 @@ DEBUG: Plan is router executable
(5 rows)
-- parametric prepare queries can be router plannable
-- it will be fixed with another pr
PREPARE author_articles(int) as
SELECT *
FROM articles_hash
@ -1050,9 +1049,15 @@ EXECUTE author_articles(1);
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 840001
DEBUG: Plan is router executable
WARNING: there is no parameter $1
CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
id | author_id | title | word_count
----+-----------+--------------+------------
1 | 1 | arsenous | 9572
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
41 | 1 | aznavour | 11814
(5 rows)
-- queries inside plpgsql functions could be router plannable
CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$
DECLARE

View File

@ -60,11 +60,9 @@ SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_
\gset
-- insert to proxy, again relying on default value
-- will be fixed with another PR
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
-- test copy with bad row in middle
-- will be fixed with another PR
\set VERBOSITY terse
COPY pg_temp.:"proxy_tablename" FROM stdin;
2 dolor sit amet
@ -78,11 +76,9 @@ COPY pg_temp.:"proxy_tablename" FROM stdin;
\set VERBOSITY default
-- verify rows were copied to distributed table
-- will be fixed with another PR
SELECT * FROM insert_target ORDER BY id ASC;
-- the counter should match the number of rows stored
-- will be fixed with another PR
SELECT currval('rows_inserted');
SET client_min_messages TO DEFAULT;

View File

@ -152,5 +152,62 @@ EXECUTE prepared_test_6(155);
-- FIXME: temporarily disabled
-- EXECUTE prepared_test_6(1555);
-- test router executor with parameterized non-partition columns
-- create a custom type which also exists on worker nodes
CREATE TYPE test_composite_type AS (
i integer,
i2 integer
);
CREATE TABLE router_executor_table (
id bigint NOT NULL,
comment varchar(20),
stats test_composite_type
);
SELECT master_create_distributed_table('router_executor_table', 'id', 'hash');
SELECT master_create_worker_shards('router_executor_table', 2, 2);
-- test parameterized inserts
PREPARE prepared_insert(varchar(20)) AS
INSERT INTO router_executor_table VALUES (1, $1, $2);
EXECUTE prepared_insert('comment-1', '(1, 10)');
EXECUTE prepared_insert('comment-2', '(2, 20)');
EXECUTE prepared_insert('comment-3', '(3, 30)');
EXECUTE prepared_insert('comment-4', '(4, 40)');
EXECUTE prepared_insert('comment-5', '(5, 50)');
EXECUTE prepared_insert('comment-6', '(6, 60)');
SELECT * FROM router_executor_table ORDER BY comment;
-- test parameterized selects
PREPARE prepared_select(integer, integer) AS
SELECT count(*) FROM router_executor_table
WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type;
EXECUTE prepared_select(1, 10);
EXECUTE prepared_select(2, 20);
EXECUTE prepared_select(3, 30);
EXECUTE prepared_select(4, 40);
EXECUTE prepared_select(5, 50);
EXECUTE prepared_select(6, 60);
-- test that we don't crash on failing parameterized insert on the partition column
PREPARE prepared_partition_column_insert(bigint) AS
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
-- we error out on the 6th execution
EXECUTE prepared_partition_column_insert(1);
EXECUTE prepared_partition_column_insert(2);
EXECUTE prepared_partition_column_insert(3);
EXECUTE prepared_partition_column_insert(4);
EXECUTE prepared_partition_column_insert(5);
EXECUTE prepared_partition_column_insert(6);
DROP TYPE test_composite_type CASCADE;
-- clean-up prepared statements
DEALLOCATE ALL;

View File

@ -475,7 +475,6 @@ PREPARE author_1_articles as
EXECUTE author_1_articles;
-- parametric prepare queries can be router plannable
-- it will be fixed with another pr
PREPARE author_articles(int) as
SELECT *
FROM articles_hash