Pushdown procedures with OUT parameters (#5348)

onder_planner_tmp
Teja Mupparti 2021-10-11 23:14:36 -07:00 committed by GitHub
parent 877d21d3f4
commit a8348047c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 966 additions and 98 deletions

View File

@ -49,9 +49,6 @@
static bool CallFuncExprRemotely(CallStmt *callStmt,
DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, DestReceiver *dest);
#if PG_VERSION_NUM >= PG_VERSION_14
static bool FunctionHasOutOnlyParameter(Oid functionOid);
#endif
/*
* CallDistributedProcedureRemotely calls a stored procedure on the worker if possible.
@ -117,8 +114,19 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
}
else
{
List *argumentList = NIL;
List *namedArgList;
int numberOfArgs;
Oid *argumentTypes;
if (!get_merged_argument_list(callStmt, &namedArgList, &argumentTypes,
&argumentList, &numberOfArgs))
{
argumentList = funcExpr->args;
}
placement =
ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr,
ShardPlacementForFunctionColocatedWithDistTable(procedure, argumentList,
partitionColumn, distTable,
NULL);
}
@ -148,26 +156,12 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_14
/*
* We might need to add outargs to the funcExpr->args so that they can
* be pushed down. We can implement in the future.
*/
if (FunctionHasOutOnlyParameter(funcExpr->funcid))
{
ereport(DEBUG1, (errmsg("not pushing down procedures with OUT parameters")));
return false;
}
#endif
ereport(DEBUG1, (errmsg("pushing down the procedure")));
/* build remote command with fully qualified names */
StringInfo callCommand = makeStringInfo();
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr));
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt));
{
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
@ -227,53 +221,3 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return true;
}
#if PG_VERSION_NUM >= PG_VERSION_14
/*
* FunctionHasOutOnlyParameter is a helper function which takes
* a function Oid and returns true if the input function has at least
* one OUT parameter.
*/
static bool
FunctionHasOutOnlyParameter(Oid functionOid)
{
Oid *argTypes = NULL;
char **argNames = NULL;
char *argModes = NULL;
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for function %u", functionOid);
}
int numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
if (argModes == NULL)
{
/* short circuit, all arguments are IN */
ReleaseSysCache(proctup);
return false;
}
int argIndex = 0;
for (; argIndex < numberOfArgs; ++argIndex)
{
if (argModes[argIndex] == PROARGMODE_OUT)
{
ReleaseSysCache(proctup);
return true;
}
}
ReleaseSysCache(proctup);
return false;
}
#endif

View File

@ -460,6 +460,19 @@ pg_get_query_def(Query *query, StringInfo buffer)
get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0);
}
/*
* get_merged_argument_list merges both IN and OUT arguments lists into one and also
* eliminates the INOUT duplicates(present in both the lists).
*/
bool
get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList,
Oid **mergedNamedArgTypes,
List **mergedArgumentList,
int *totalArguments)
{
/* No OUT argument support in Postgres 12 */
return false;
}
/*
* pg_get_rule_expr deparses an expression and returns the result as a string.
@ -6012,6 +6025,10 @@ get_rule_expr(Node *node, deparse_context *context,
get_tablefunc((TableFunc *) node, context, showimplicit);
break;
case T_CallStmt:
get_func_expr(((CallStmt *) node)->funcexpr, context, showimplicit);
break;
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;

View File

@ -465,6 +465,19 @@ pg_get_query_def(Query *query, StringInfo buffer)
get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0);
}
/*
* get_merged_argument_list merges both IN and OUT arguments lists into one and also
* eliminates the INOUT duplicates(present in both the lists).
*/
bool
get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList,
Oid **mergedNamedArgTypes,
List **mergedArgumentList,
int *totalArguments)
{
/* No OUT argument support in Postgres 13 */
return false;
}
/*
* pg_get_rule_expr deparses an expression and returns the result as a string.
@ -6068,6 +6081,10 @@ get_rule_expr(Node *node, deparse_context *context,
get_tablefunc((TableFunc *) node, context, showimplicit);
break;
case T_CallStmt:
get_func_expr(((CallStmt *) node)->funcexpr, context, showimplicit);
break;
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;

View File

@ -413,6 +413,8 @@ static bool looks_like_function(Node *node);
static void get_oper_expr(OpExpr *expr, deparse_context *context);
static void get_func_expr(FuncExpr *expr, deparse_context *context,
bool showimplicit);
static void get_proc_expr(CallStmt *stmt, deparse_context *context,
bool showimplicit);
static void get_agg_expr(Aggref *aggref, deparse_context *context,
Aggref *original_aggref);
static void get_agg_combine_expr(Node *node, deparse_context *context,
@ -472,7 +474,142 @@ pg_get_query_def(Query *query, StringInfo buffer)
get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0);
}
/*
* get_merged_argument_list merges both the IN and OUT arguments lists into one and
* also eliminates the INOUT duplicates(present in both the lists). After merging both
* the lists, it returns all the named-arguments in a list(mergedNamedArgList) along
* with their types(mergedNamedArgTypes), final argument list(mergedArgumentList), and
* the total number of arguments(totalArguments).
*/
bool
get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList,
Oid **mergedNamedArgTypes,
List **mergedArgumentList,
int *totalArguments)
{
Oid functionOid = stmt->funcexpr->funcid;
List *namedArgList = NIL;
List *finalArgumentList = NIL;
Oid finalArgTypes[FUNC_MAX_ARGS];
Oid *argTypes = NULL;
char *argModes = NULL;
char **argNames = NULL;
int argIndex = 0;
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for function %u", functionOid);
}
int defArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
ReleaseSysCache(proctup);
if (argModes == NULL)
{
/* No OUT arguments */
return false;
}
/*
* Passed arguments Includes IN, OUT, INOUT (in both the lists) and VARIADIC arguments,
* which means INOUT arguments are double counted.
*/
int numberOfArgs = list_length(stmt->funcexpr->args) + list_length(stmt->outargs);
int totalInoutArgs = 0;
/* Let's count INOUT arguments from the defined number of arguments */
for (argIndex=0; argIndex < defArgs; ++argIndex)
{
if (argModes[argIndex] == PROARGMODE_INOUT)
totalInoutArgs++;
}
/* Remove the duplicate INOUT counting */
numberOfArgs = numberOfArgs - totalInoutArgs;
ListCell *inArgCell = list_head(stmt->funcexpr->args);
ListCell *outArgCell = list_head(stmt->outargs);
for (argIndex=0; argIndex < numberOfArgs; ++argIndex)
{
switch (argModes[argIndex])
{
case PROARGMODE_IN:
case PROARGMODE_VARIADIC:
{
Node *arg = (Node *) lfirst(inArgCell);
if (IsA(arg, NamedArgExpr))
namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name);
finalArgTypes[argIndex] = exprType(arg);
finalArgumentList = lappend(finalArgumentList, arg);
inArgCell = lnext(stmt->funcexpr->args, inArgCell);
break;
}
case PROARGMODE_OUT:
{
Node *arg = (Node *) lfirst(outArgCell);
if (IsA(arg, NamedArgExpr))
namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name);
finalArgTypes[argIndex] = exprType(arg);
finalArgumentList = lappend(finalArgumentList, arg);
outArgCell = lnext(stmt->outargs, outArgCell);
break;
}
case PROARGMODE_INOUT:
{
Node *arg = (Node *) lfirst(inArgCell);
if (IsA(arg, NamedArgExpr))
namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name);
finalArgTypes[argIndex] = exprType(arg);
finalArgumentList = lappend(finalArgumentList, arg);
inArgCell = lnext(stmt->funcexpr->args, inArgCell);
outArgCell = lnext(stmt->outargs, outArgCell);
break;
}
case PROARGMODE_TABLE:
default:
{
elog(ERROR, "Unhandled procedure argument mode[%d]", argModes[argIndex]);
break;
}
}
}
/*
* After eliminating INOUT duplicates and merging OUT arguments, we now
* have the final list of arguments.
*/
if (defArgs != list_length(finalArgumentList))
{
elog(ERROR, "Insufficient number of args passed[%d] for function[%s]",
list_length(finalArgumentList),
get_func_name(functionOid));
}
if (list_length(finalArgumentList) > FUNC_MAX_ARGS)
{
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_ARGUMENTS),
errmsg("too many arguments[%d] for function[%s]",
list_length(finalArgumentList),
get_func_name(functionOid))));
}
*mergedNamedArgList = namedArgList;
*mergedNamedArgTypes = finalArgTypes;
*mergedArgumentList = finalArgumentList;
*totalArguments = numberOfArgs;
return true;
}
/*
* pg_get_rule_expr deparses an expression and returns the result as a string.
*/
@ -6216,6 +6353,10 @@ get_rule_expr(Node *node, deparse_context *context,
get_tablefunc((TableFunc *) node, context, showimplicit);
break;
case T_CallStmt:
get_proc_expr((CallStmt *) node, context, showimplicit);
break;
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
@ -6437,6 +6578,52 @@ get_func_expr(FuncExpr *expr, deparse_context *context,
appendStringInfoString(buf, "VARIADIC ");
get_rule_expr((Node *) lfirst(l), context, true);
}
appendStringInfoChar(buf, ')');
}
/*
* get_proc_expr - Parse back a CallStmt node
*/
static void
get_proc_expr(CallStmt *stmt, deparse_context *context,
bool showimplicit)
{
StringInfo buf = context->buf;
Oid functionOid = stmt->funcexpr->funcid;
bool use_variadic;
Oid *argumentTypes;
List *finalArgumentList = NIL;
ListCell *argumentCell;
List *namedArgList = NIL;
int numberOfArgs = -1;
if (!get_merged_argument_list(stmt, &namedArgList, &argumentTypes,
&finalArgumentList, &numberOfArgs))
{
/* Nothing merged i.e. no OUT arguments */
get_func_expr((FuncExpr *) stmt->funcexpr, context, showimplicit);
return;
}
appendStringInfo(buf, "%s(",
generate_function_name(functionOid, numberOfArgs,
namedArgList, argumentTypes,
stmt->funcexpr->funcvariadic,
&use_variadic,
context->special_exprkind));
int argNumber = 0;
foreach(argumentCell, finalArgumentList)
{
if (argNumber++ > 0)
appendStringInfoString(buf, ", ");
if (use_variadic && lnext(finalArgumentList, argumentCell) == NULL)
appendStringInfoString(buf, "VARIADIC ");
get_rule_expr((Node *) lfirst(argumentCell), context, true);
argNumber++;
}
appendStringInfoChar(buf, ')');
}

View File

@ -265,7 +265,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
}
else
{
placement = ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr,
placement = ShardPlacementForFunctionColocatedWithDistTable(procedure,
funcExpr->args,
partitionColumn,
distTable,
planContext->plan);
@ -346,19 +347,19 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
*/
ShardPlacement *
ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure,
FuncExpr *funcExpr,
List *argumentList,
Var *partitionColumn,
CitusTableCacheEntry *cacheEntry,
PlannedStmt *plan)
{
if (procedure->distributionArgIndex < 0 ||
procedure->distributionArgIndex >= list_length(funcExpr->args))
procedure->distributionArgIndex >= list_length(argumentList))
{
ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index")));
return NULL;
}
Node *partitionValueNode = (Node *) list_nth(funcExpr->args,
Node *partitionValueNode = (Node *) list_nth(argumentList,
procedure->distributionArgIndex);
partitionValueNode = strip_implicit_coercions(partitionValueNode);

View File

@ -48,6 +48,9 @@ extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);
bool get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList,
Oid **mergedNamedArgTypes, List **mergedArgumentList,
int *totalArguments);
char * pg_get_rule_expr(Node *expression);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer);

View File

@ -185,11 +185,10 @@ extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
extern ShardPlacement * ShardPlacementForFunctionColocatedWithReferenceTable(
CitusTableCacheEntry *cacheEntry);
extern ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable(
DistObjectCacheEntry *procedure, FuncExpr *funcExpr, Var *partitionColumn,
DistObjectCacheEntry *procedure, List *argumentList, Var *partitionColumn,
CitusTableCacheEntry
*cacheEntry,
PlannedStmt *plan);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);

View File

@ -269,6 +269,351 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla
RESET citus.multi_shard_modify_mode;
-- test procedure OUT parameters with procedure pushdown
CREATE TABLE prctbl(val int primary key);
CREATE OR REPLACE PROCEDURE insert_data(arg1 integer)
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Proc with no OUT args';
INSERT INTO pg14.prctbl VALUES (arg1);
END;
$$;
CREATE PROCEDURE insert_data_out(val integer, OUT res text)
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Proc with OUT args';
INSERT INTO pg14.prctbl VALUES (val);
res := 'insert_data_out():proc-result'::text;
END
$$;
CREATE FUNCTION insert_data_out_fn(val integer, OUT res text)
RETURNS TEXT
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Func with OUT args';
INSERT INTO pg14.prctbl VALUES (val);
res := 'insert_data_out_fn():func-result'::text;
END;
$$;
CREATE OR REPLACE PROCEDURE proc_varargs(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
VARIADIC list NUMERIC[])
AS $$
BEGIN
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
-- Named arguments
CREATE OR REPLACE PROCEDURE proc_namedargs(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT)
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result; total := 999; average := 99;
END; $$
LANGUAGE plpgsql;
-- Mix of IN, OUT, INOUT and Variadic
CREATE OR REPLACE PROCEDURE proc_namedargs_var(
IN inp1 INT,
IN inp2 INT,
INOUT inout1 TEXT,
OUT out1 INT,
VARIADIC list INT[])
AS $$
DECLARE sum INT;
BEGIN
out1 := 5;
SELECT INTO sum SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
RAISE NOTICE 'Input-1: % Input-2: % VarSum: %', inp1, inp2, sum;
SELECT 'Final : ' || inout1 INTO inout1;
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout2(
INOUT result TEXT,
OUT total NUMERIC,
OUT average NUMERIC,
IN inp INT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout3(
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT,
IN inp INT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
-- Function overload
CREATE PROCEDURE proc_namedargs_overload(
IN inp INT)
AS $$
BEGIN
RAISE NOTICE 'IN passed INT: %', inp;
END; $$
LANGUAGE plpgsql;
CREATE PROCEDURE proc_namedargs_overload(
IN inp NUMERIC)
AS $$
BEGIN
RAISE NOTICE 'IN passed NUMERIC: %', inp;
END; $$
LANGUAGE plpgsql;
-- Before distribution
CALL insert_data(1);
NOTICE: Proc with no OUT args
CONTEXT: PL/pgSQL function insert_data(integer) line XX at RAISE
CALL insert_data_out(2, 'whythisarg');
NOTICE: Proc with OUT args
CONTEXT: PL/pgSQL function insert_data_out(integer) line XX at RAISE
res
---------------------------------------------------------------------
insert_data_out():proc-result
(1 row)
SELECT insert_data_out_fn(3);
NOTICE: Func with OUT args
CONTEXT: PL/pgSQL function insert_data_out_fn(integer) line XX at RAISE
insert_data_out_fn
---------------------------------------------------------------------
insert_data_out_fn():func-result
(1 row)
-- Return the average and the sum of 2, 8, 20
CALL proc_varargs(1, 1, 1, 2, 8, 20);
total | average
---------------------------------------------------------------------
30 | 10.0000000000000000
(1 row)
CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20);
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:Testing in/out/var arguments
(1 row)
CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20);
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:99
(1 row)
CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20);
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:TEST COERCE_SQL_SYNTAX
(1 row)
CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4);
NOTICE: IN passed: 4
CONTEXT: PL/pgSQL function proc_namedargs(integer,text) line XX at RAISE
total | average | result
---------------------------------------------------------------------
999 | 99 | Final:Named args
(1 row)
CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}');
NOTICE: Input-1: 1 Input-2: 2 VarSum: 27
CONTEXT: PL/pgSQL function proc_namedargs_var(integer,integer,text,integer[]) line XX at RAISE
inout1 | out1
---------------------------------------------------------------------
Final : INOUT third argument | 5
(1 row)
CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20);
NOTICE: IN passed: 5
CONTEXT: PL/pgSQL function proc_varargs_inout2(text,integer,numeric[]) line XX at RAISE
result | total | average
---------------------------------------------------------------------
Final:In Out | 30 | 10.0000000000000000
(1 row)
CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20);
NOTICE: IN passed: 6
CONTEXT: PL/pgSQL function proc_varargs_inout3(text,integer,numeric[]) line XX at RAISE
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:In Out
(1 row)
CALL proc_namedargs_overload(3);
NOTICE: IN passed INT: 3
CONTEXT: PL/pgSQL function proc_namedargs_overload(integer) line XX at RAISE
CALL proc_namedargs_overload(4.0);
NOTICE: IN passed NUMERIC: 4.0
CONTEXT: PL/pgSQL function proc_namedargs_overload(numeric) line XX at RAISE
CALL proc_namedargs_overload(inp=>5);
NOTICE: IN passed INT: 5
CONTEXT: PL/pgSQL function proc_namedargs_overload(integer) line XX at RAISE
CALL proc_namedargs_overload(inp=>6.0);
NOTICE: IN passed NUMERIC: 6.0
CONTEXT: PL/pgSQL function proc_namedargs_overload(numeric) line XX at RAISE
-- Distribute the table, procedure and function
SELECT create_distributed_table('prctbl', 'val', colocate_with := 'none');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.prctbl$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'insert_data(int)', 'arg1',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'insert_data_out(int)', 'val',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'insert_data_out_fn(int)', 'val',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_varargs(int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_varargs_inout(int, text, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_namedargs(int, text)', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_namedargs_var(int, int, text, int[])', 'inp1',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_varargs_inout2(text, int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_varargs_inout3(text, int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_namedargs_overload(int)', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'proc_namedargs_overload(numeric)', 'inp',
colocate_with := 'prctbl'
);
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE TABLE test_proc_table (a int);
create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text)
language plpgsql
@ -296,22 +641,6 @@ CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
{2} | 2
(1 row)
SELECT create_distributed_table('test_proc_table', 'a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- make sure that metadata is synced, it may take few seconds
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
@ -329,22 +658,141 @@ SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port
t
(1 row)
-- still, we do not pushdown procedures with OUT parameters
SELECT create_distributed_table('test_proc_table', 'a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- pushdown procedures with OUT parameters
SET client_min_messages TO DEBUG1;
CALL proc_pushdown(1, NULL, NULL);
DEBUG: not pushing down procedures with OUT parameters
DEBUG: pushing down the procedure
created | res_out
---------------------------------------------------------------------
{3} | 3
(1 row)
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
DEBUG: not pushing down procedures with OUT parameters
DEBUG: pushing down the procedure
created | res_out
---------------------------------------------------------------------
{4} | 4
(1 row)
CALL insert_data(4);
DEBUG: pushing down the procedure
NOTICE: Proc with no OUT args
DETAIL: from localhost:xxxxx
CALL insert_data_out(5, 'whythisarg');
DEBUG: pushing down the procedure
NOTICE: Proc with OUT args
DETAIL: from localhost:xxxxx
res
---------------------------------------------------------------------
insert_data_out():proc-result
(1 row)
SELECT insert_data_out_fn(6);
DEBUG: pushing down the function call
NOTICE: Func with OUT args
DETAIL: from localhost:xxxxx
insert_data_out_fn
---------------------------------------------------------------------
insert_data_out_fn():func-result
(1 row)
-- Return the average and the sum of 2, 8, 20
CALL proc_varargs(1, 1, 1, 2, 8, 20);
DEBUG: pushing down the procedure
total | average
---------------------------------------------------------------------
30 | 10.0000000000000000
(1 row)
CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20);
DEBUG: pushing down the procedure
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:Testing in/out/var arguments
(1 row)
CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20);
DEBUG: pushing down the procedure
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:99
(1 row)
CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20);
DEBUG: pushing down the procedure
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:TEST COERCE_SQL_SYNTAX
(1 row)
CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4);
DEBUG: pushing down the procedure
NOTICE: IN passed: 4
DETAIL: from localhost:xxxxx
total | average | result
---------------------------------------------------------------------
999 | 99 | Final:Named args
(1 row)
CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}');
DEBUG: pushing down the procedure
NOTICE: Input-1: 1 Input-2: 2 VarSum: 27
DETAIL: from localhost:xxxxx
inout1 | out1
---------------------------------------------------------------------
Final : INOUT third argument | 5
(1 row)
CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20);
DEBUG: pushing down the procedure
NOTICE: IN passed: 5
DETAIL: from localhost:xxxxx
result | total | average
---------------------------------------------------------------------
Final:In Out | 30 | 10.0000000000000000
(1 row)
CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20);
DEBUG: pushing down the procedure
NOTICE: IN passed: 6
DETAIL: from localhost:xxxxx
total | average | result
---------------------------------------------------------------------
30 | 10.0000000000000000 | Final:In Out
(1 row)
CALL proc_namedargs_overload(3);
DEBUG: pushing down the procedure
NOTICE: IN passed INT: 3
DETAIL: from localhost:xxxxx
CALL proc_namedargs_overload(4.0);
DEBUG: pushing down the procedure
NOTICE: IN passed NUMERIC: 4.0
DETAIL: from localhost:xxxxx
CALL proc_namedargs_overload(inp=>5);
DEBUG: pushing down the procedure
NOTICE: IN passed INT: 5
DETAIL: from localhost:xxxxx
CALL proc_namedargs_overload(inp=>6.0);
DEBUG: pushing down the procedure
NOTICE: IN passed NUMERIC: 6.0
DETAIL: from localhost:xxxxx
RESET client_min_messages;
-- we don't need metadata syncing anymore
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -105,6 +105,242 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla
RESET citus.multi_shard_modify_mode;
-- test procedure OUT parameters with procedure pushdown
CREATE TABLE prctbl(val int primary key);
CREATE OR REPLACE PROCEDURE insert_data(arg1 integer)
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Proc with no OUT args';
INSERT INTO pg14.prctbl VALUES (arg1);
END;
$$;
CREATE PROCEDURE insert_data_out(val integer, OUT res text)
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Proc with OUT args';
INSERT INTO pg14.prctbl VALUES (val);
res := 'insert_data_out():proc-result'::text;
END
$$;
CREATE FUNCTION insert_data_out_fn(val integer, OUT res text)
RETURNS TEXT
LANGUAGE PLPGSQL
AS $$
BEGIN
RAISE NOTICE 'Func with OUT args';
INSERT INTO pg14.prctbl VALUES (val);
res := 'insert_data_out_fn():func-result'::text;
END;
$$;
CREATE OR REPLACE PROCEDURE proc_varargs(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
VARIADIC list NUMERIC[])
AS $$
BEGIN
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
-- Named arguments
CREATE OR REPLACE PROCEDURE proc_namedargs(
IN inp INT,
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT)
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result; total := 999; average := 99;
END; $$
LANGUAGE plpgsql;
-- Mix of IN, OUT, INOUT and Variadic
CREATE OR REPLACE PROCEDURE proc_namedargs_var(
IN inp1 INT,
IN inp2 INT,
INOUT inout1 TEXT,
OUT out1 INT,
VARIADIC list INT[])
AS $$
DECLARE sum INT;
BEGIN
out1 := 5;
SELECT INTO sum SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
RAISE NOTICE 'Input-1: % Input-2: % VarSum: %', inp1, inp2, sum;
SELECT 'Final : ' || inout1 INTO inout1;
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout2(
INOUT result TEXT,
OUT total NUMERIC,
OUT average NUMERIC,
IN inp INT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE proc_varargs_inout3(
OUT total NUMERIC,
OUT average NUMERIC,
INOUT result TEXT,
IN inp INT,
VARIADIC list NUMERIC[])
AS $$
BEGIN
RAISE NOTICE 'IN passed: %', inp;
SELECT 'Final:' || result INTO result;
SELECT INTO total SUM(list[i])
FROM generate_subscripts(list, 1) g(i);
SELECT INTO average AVG(list[i])
FROM generate_subscripts(list, 1) g(i);
END; $$
LANGUAGE plpgsql;
-- Function overload
CREATE PROCEDURE proc_namedargs_overload(
IN inp INT)
AS $$
BEGIN
RAISE NOTICE 'IN passed INT: %', inp;
END; $$
LANGUAGE plpgsql;
CREATE PROCEDURE proc_namedargs_overload(
IN inp NUMERIC)
AS $$
BEGIN
RAISE NOTICE 'IN passed NUMERIC: %', inp;
END; $$
LANGUAGE plpgsql;
-- Before distribution
CALL insert_data(1);
CALL insert_data_out(2, 'whythisarg');
SELECT insert_data_out_fn(3);
-- Return the average and the sum of 2, 8, 20
CALL proc_varargs(1, 1, 1, 2, 8, 20);
CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20);
CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20);
CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20);
CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4);
CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}');
CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20);
CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20);
CALL proc_namedargs_overload(3);
CALL proc_namedargs_overload(4.0);
CALL proc_namedargs_overload(inp=>5);
CALL proc_namedargs_overload(inp=>6.0);
-- Distribute the table, procedure and function
SELECT create_distributed_table('prctbl', 'val', colocate_with := 'none');
SELECT create_distributed_function(
'insert_data(int)', 'arg1',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'insert_data_out(int)', 'val',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'insert_data_out_fn(int)', 'val',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_varargs(int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_varargs_inout(int, text, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_namedargs(int, text)', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_namedargs_var(int, int, text, int[])', 'inp1',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_varargs_inout2(text, int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_varargs_inout3(text, int, NUMERIC[])', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_namedargs_overload(int)', 'inp',
colocate_with := 'prctbl'
);
SELECT create_distributed_function(
'proc_namedargs_overload(numeric)', 'inp',
colocate_with := 'prctbl'
);
CREATE TABLE test_proc_table (a int);
create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text)
@ -125,9 +361,6 @@ end;$$;
CALL proc_pushdown(1, NULL, NULL);
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
SELECT create_distributed_table('test_proc_table', 'a');
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
-- make sure that metadata is synced, it may take few seconds
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
@ -136,10 +369,29 @@ CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 1500
SELECT wait_until_metadata_sync(30000);
SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port);
-- still, we do not pushdown procedures with OUT parameters
SELECT create_distributed_table('test_proc_table', 'a');
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
-- pushdown procedures with OUT parameters
SET client_min_messages TO DEBUG1;
CALL proc_pushdown(1, NULL, NULL);
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
CALL insert_data(4);
CALL insert_data_out(5, 'whythisarg');
SELECT insert_data_out_fn(6);
-- Return the average and the sum of 2, 8, 20
CALL proc_varargs(1, 1, 1, 2, 8, 20);
CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20);
CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20);
CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20);
CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4);
CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}');
CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20);
CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20);
CALL proc_namedargs_overload(3);
CALL proc_namedargs_overload(4.0);
CALL proc_namedargs_overload(inp=>5);
CALL proc_namedargs_overload(inp=>6.0);
RESET client_min_messages;
-- we don't need metadata syncing anymore