mirror of https://github.com/citusdata/citus.git
Merge pull request #649 from citusdata/router_executor_prepare_support
Router executor prepare support for non-partition columnspull/643/head
commit
bdd57ec9d0
|
@ -19,6 +19,8 @@
|
|||
#include "miscadmin.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "access/transam.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
|
@ -33,6 +35,7 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/errcodes.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/palloc.h"
|
||||
#include "utils/int8.h"
|
||||
|
@ -55,7 +58,11 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
|
|||
DestReceiver *destination,
|
||||
Tuplestorestate *tupleStore);
|
||||
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
|
||||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
|
||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||
Oid **parameterTypes,
|
||||
const char ***parameterValues);
|
||||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
|
||||
ParamListInfo paramListInfo);
|
||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||
TupleDesc tupleDescriptor, int64 *rows);
|
||||
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
|
||||
|
@ -316,6 +323,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
TupleDesc tupleDescriptor = queryDesc->tupDesc;
|
||||
EState *executorState = queryDesc->estate;
|
||||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||
ParamListInfo paramListInfo = queryDesc->params;
|
||||
bool resultsOK = false;
|
||||
List *taskPlacementList = task->taskPlacementList;
|
||||
ListCell *taskPlacementCell = NULL;
|
||||
|
@ -359,7 +367,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
continue;
|
||||
}
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString);
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
PurgeConnection(connection);
|
||||
|
@ -515,12 +523,28 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
|||
* connection so that we receive results a row at a time.
|
||||
*/
|
||||
static bool
|
||||
SendQueryInSingleRowMode(PGconn *connection, char *query)
|
||||
SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo)
|
||||
{
|
||||
int querySent = 0;
|
||||
int singleRowMode = 0;
|
||||
|
||||
querySent = PQsendQuery(connection, query);
|
||||
if (paramListInfo != NULL)
|
||||
{
|
||||
int parameterCount = paramListInfo->numParams;
|
||||
Oid *parameterTypes = NULL;
|
||||
const char **parameterValues = NULL;
|
||||
|
||||
ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes,
|
||||
¶meterValues);
|
||||
|
||||
querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes,
|
||||
parameterValues, NULL, NULL, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
querySent = PQsendQuery(connection, query);
|
||||
}
|
||||
|
||||
if (querySent == 0)
|
||||
{
|
||||
WarnRemoteError(connection, NULL);
|
||||
|
@ -538,6 +562,55 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractParametersFromParamListInfo extracts parameter types and values from
|
||||
* the given ParamListInfo structure, and fills parameter type and value arrays.
|
||||
*/
|
||||
static void
|
||||
ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes,
|
||||
const char ***parameterValues)
|
||||
{
|
||||
int parameterIndex = 0;
|
||||
int parameterCount = paramListInfo->numParams;
|
||||
|
||||
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
|
||||
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
|
||||
|
||||
/* get parameter types and values */
|
||||
for (parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++)
|
||||
{
|
||||
ParamExternData *parameterData = ¶mListInfo->params[parameterIndex];
|
||||
Oid typeOutputFunctionId = InvalidOid;
|
||||
bool variableLengthType = false;
|
||||
|
||||
/*
|
||||
* Use 0 for data types where the oid values can be different on
|
||||
* the master and worker nodes. Therefore, the worker nodes can
|
||||
* infer the correct oid.
|
||||
*/
|
||||
if (parameterData->ptype >= FirstNormalObjectId)
|
||||
{
|
||||
(*parameterTypes)[parameterIndex] = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
(*parameterTypes)[parameterIndex] = parameterData->ptype;
|
||||
}
|
||||
|
||||
if (parameterData->isnull)
|
||||
{
|
||||
(*parameterValues)[parameterIndex] = NULL;
|
||||
continue;
|
||||
}
|
||||
|
||||
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
|
||||
&variableLengthType);
|
||||
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
|
||||
parameterData->value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreQueryResult gets the query results from the given connection, builds
|
||||
* tuples from the results, and stores them in the a newly created
|
||||
|
|
|
@ -64,23 +64,29 @@ CREATE TEMPORARY SEQUENCE rows_inserted;
|
|||
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
|
||||
\gset
|
||||
-- insert to proxy, again relying on default value
|
||||
-- will be fixed with another PR
|
||||
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
-- test copy with bad row in middle
|
||||
-- will be fixed with another PR
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: null value in column "data" violates not-null constraint
|
||||
\set VERBOSITY default
|
||||
-- verify rows were copied to distributed table
|
||||
-- will be fixed with another PR
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
id | data
|
||||
----+------
|
||||
(0 rows)
|
||||
id | data
|
||||
----+-----------------------------
|
||||
1 | lorem ipsum
|
||||
2 | dolor sit amet
|
||||
3 | consectetur adipiscing elit
|
||||
4 | sed do eiusmod
|
||||
5 | tempor incididunt ut
|
||||
6 | labore et dolore
|
||||
(6 rows)
|
||||
|
||||
-- the counter should match the number of rows stored
|
||||
-- will be fixed with another PR
|
||||
SELECT currval('rows_inserted');
|
||||
ERROR: currval of sequence "rows_inserted" is not yet defined in this session
|
||||
currval
|
||||
---------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
||||
|
|
|
@ -242,5 +242,101 @@ EXECUTE prepared_test_6(155);
|
|||
|
||||
-- FIXME: temporarily disabled
|
||||
-- EXECUTE prepared_test_6(1555);
|
||||
-- test router executor with parameterized non-partition columns
|
||||
-- create a custom type which also exists on worker nodes
|
||||
CREATE TYPE test_composite_type AS (
|
||||
i integer,
|
||||
i2 integer
|
||||
);
|
||||
CREATE TABLE router_executor_table (
|
||||
id bigint NOT NULL,
|
||||
comment varchar(20),
|
||||
stats test_composite_type
|
||||
);
|
||||
SELECT master_create_distributed_table('router_executor_table', 'id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('router_executor_table', 2, 2);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- test parameterized inserts
|
||||
PREPARE prepared_insert(varchar(20)) AS
|
||||
INSERT INTO router_executor_table VALUES (1, $1, $2);
|
||||
EXECUTE prepared_insert('comment-1', '(1, 10)');
|
||||
EXECUTE prepared_insert('comment-2', '(2, 20)');
|
||||
EXECUTE prepared_insert('comment-3', '(3, 30)');
|
||||
EXECUTE prepared_insert('comment-4', '(4, 40)');
|
||||
EXECUTE prepared_insert('comment-5', '(5, 50)');
|
||||
EXECUTE prepared_insert('comment-6', '(6, 60)');
|
||||
SELECT * FROM router_executor_table ORDER BY comment;
|
||||
id | comment | stats
|
||||
----+-----------+--------
|
||||
1 | comment-1 | (1,10)
|
||||
1 | comment-2 | (2,20)
|
||||
1 | comment-3 | (3,30)
|
||||
1 | comment-4 | (4,40)
|
||||
1 | comment-5 | (5,50)
|
||||
1 | comment-6 | (6,60)
|
||||
(6 rows)
|
||||
|
||||
-- test parameterized selects
|
||||
PREPARE prepared_select(integer, integer) AS
|
||||
SELECT count(*) FROM router_executor_table
|
||||
WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type;
|
||||
EXECUTE prepared_select(1, 10);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE prepared_select(2, 20);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE prepared_select(3, 30);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE prepared_select(4, 40);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE prepared_select(5, 50);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
EXECUTE prepared_select(6, 60);
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- test that we don't crash on failing parameterized insert on the partition column
|
||||
PREPARE prepared_partition_column_insert(bigint) AS
|
||||
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
|
||||
-- we error out on the 6th execution
|
||||
EXECUTE prepared_partition_column_insert(1);
|
||||
EXECUTE prepared_partition_column_insert(2);
|
||||
EXECUTE prepared_partition_column_insert(3);
|
||||
EXECUTE prepared_partition_column_insert(4);
|
||||
EXECUTE prepared_partition_column_insert(5);
|
||||
EXECUTE prepared_partition_column_insert(6);
|
||||
ERROR: values given for the partition column must be constants or constant expressions
|
||||
DROP TYPE test_composite_type CASCADE;
|
||||
NOTICE: drop cascades to table router_executor_table column stats
|
||||
-- clean-up prepared statements
|
||||
DEALLOCATE ALL;
|
||||
|
|
|
@ -1041,7 +1041,6 @@ DEBUG: Plan is router executable
|
|||
(5 rows)
|
||||
|
||||
-- parametric prepare queries can be router plannable
|
||||
-- it will be fixed with another pr
|
||||
PREPARE author_articles(int) as
|
||||
SELECT *
|
||||
FROM articles_hash
|
||||
|
@ -1050,9 +1049,15 @@ EXECUTE author_articles(1);
|
|||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 840001
|
||||
DEBUG: Plan is router executable
|
||||
WARNING: there is no parameter $1
|
||||
CONTEXT: while executing command on localhost:57637
|
||||
ERROR: could not receive query results
|
||||
id | author_id | title | word_count
|
||||
----+-----------+--------------+------------
|
||||
1 | 1 | arsenous | 9572
|
||||
11 | 1 | alamo | 1347
|
||||
21 | 1 | arcading | 5890
|
||||
31 | 1 | athwartships | 7271
|
||||
41 | 1 | aznavour | 11814
|
||||
(5 rows)
|
||||
|
||||
-- queries inside plpgsql functions could be router plannable
|
||||
CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$
|
||||
DECLARE
|
||||
|
|
|
@ -60,11 +60,9 @@ SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_
|
|||
\gset
|
||||
|
||||
-- insert to proxy, again relying on default value
|
||||
-- will be fixed with another PR
|
||||
-- INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
|
||||
-- test copy with bad row in middle
|
||||
-- will be fixed with another PR
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
2 dolor sit amet
|
||||
|
@ -78,11 +76,9 @@ COPY pg_temp.:"proxy_tablename" FROM stdin;
|
|||
\set VERBOSITY default
|
||||
|
||||
-- verify rows were copied to distributed table
|
||||
-- will be fixed with another PR
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
|
||||
-- the counter should match the number of rows stored
|
||||
-- will be fixed with another PR
|
||||
SELECT currval('rows_inserted');
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
||||
|
|
|
@ -152,5 +152,62 @@ EXECUTE prepared_test_6(155);
|
|||
-- FIXME: temporarily disabled
|
||||
-- EXECUTE prepared_test_6(1555);
|
||||
|
||||
-- test router executor with parameterized non-partition columns
|
||||
|
||||
-- create a custom type which also exists on worker nodes
|
||||
CREATE TYPE test_composite_type AS (
|
||||
i integer,
|
||||
i2 integer
|
||||
);
|
||||
|
||||
CREATE TABLE router_executor_table (
|
||||
id bigint NOT NULL,
|
||||
comment varchar(20),
|
||||
stats test_composite_type
|
||||
);
|
||||
|
||||
SELECT master_create_distributed_table('router_executor_table', 'id', 'hash');
|
||||
SELECT master_create_worker_shards('router_executor_table', 2, 2);
|
||||
|
||||
-- test parameterized inserts
|
||||
PREPARE prepared_insert(varchar(20)) AS
|
||||
INSERT INTO router_executor_table VALUES (1, $1, $2);
|
||||
|
||||
EXECUTE prepared_insert('comment-1', '(1, 10)');
|
||||
EXECUTE prepared_insert('comment-2', '(2, 20)');
|
||||
EXECUTE prepared_insert('comment-3', '(3, 30)');
|
||||
EXECUTE prepared_insert('comment-4', '(4, 40)');
|
||||
EXECUTE prepared_insert('comment-5', '(5, 50)');
|
||||
EXECUTE prepared_insert('comment-6', '(6, 60)');
|
||||
|
||||
SELECT * FROM router_executor_table ORDER BY comment;
|
||||
|
||||
-- test parameterized selects
|
||||
PREPARE prepared_select(integer, integer) AS
|
||||
SELECT count(*) FROM router_executor_table
|
||||
WHERE id = 1 AND stats = ROW($1, $2)::test_composite_type;
|
||||
|
||||
EXECUTE prepared_select(1, 10);
|
||||
EXECUTE prepared_select(2, 20);
|
||||
EXECUTE prepared_select(3, 30);
|
||||
EXECUTE prepared_select(4, 40);
|
||||
EXECUTE prepared_select(5, 50);
|
||||
EXECUTE prepared_select(6, 60);
|
||||
|
||||
-- test that we don't crash on failing parameterized insert on the partition column
|
||||
|
||||
PREPARE prepared_partition_column_insert(bigint) AS
|
||||
INSERT INTO router_executor_table VALUES ($1, 'arsenous', '(1,10)');
|
||||
|
||||
-- we error out on the 6th execution
|
||||
EXECUTE prepared_partition_column_insert(1);
|
||||
EXECUTE prepared_partition_column_insert(2);
|
||||
EXECUTE prepared_partition_column_insert(3);
|
||||
EXECUTE prepared_partition_column_insert(4);
|
||||
EXECUTE prepared_partition_column_insert(5);
|
||||
EXECUTE prepared_partition_column_insert(6);
|
||||
|
||||
DROP TYPE test_composite_type CASCADE;
|
||||
|
||||
-- clean-up prepared statements
|
||||
DEALLOCATE ALL;
|
||||
|
|
|
@ -475,7 +475,6 @@ PREPARE author_1_articles as
|
|||
EXECUTE author_1_articles;
|
||||
|
||||
-- parametric prepare queries can be router plannable
|
||||
-- it will be fixed with another pr
|
||||
PREPARE author_articles(int) as
|
||||
SELECT *
|
||||
FROM articles_hash
|
||||
|
|
Loading…
Reference in New Issue