diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 98add3f28..29c62e58a 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -49,9 +49,6 @@ static bool CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, FuncExpr *funcExpr, DestReceiver *dest); -#if PG_VERSION_NUM >= PG_VERSION_14 -static bool FunctionHasOutOnlyParameter(Oid functionOid); -#endif /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. @@ -117,8 +114,19 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, } else { + List *argumentList = NIL; + List *namedArgList; + int numberOfArgs; + Oid *argumentTypes; + + if (!get_merged_argument_list(callStmt, &namedArgList, &argumentTypes, + &argumentList, &numberOfArgs)) + { + argumentList = funcExpr->args; + } + placement = - ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr, + ShardPlacementForFunctionColocatedWithDistTable(procedure, argumentList, partitionColumn, distTable, NULL); } @@ -148,26 +156,12 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return false; } - -#if PG_VERSION_NUM >= PG_VERSION_14 - - /* - * We might need to add outargs to the funcExpr->args so that they can - * be pushed down. We can implement in the future. - */ - if (FunctionHasOutOnlyParameter(funcExpr->funcid)) - { - ereport(DEBUG1, (errmsg("not pushing down procedures with OUT parameters"))); - return false; - } -#endif - ereport(DEBUG1, (errmsg("pushing down the procedure"))); /* build remote command with fully qualified names */ StringInfo callCommand = makeStringInfo(); - appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr)); + appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt)); { Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); @@ -227,53 +221,3 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return true; } - - -#if PG_VERSION_NUM >= PG_VERSION_14 - -/* - * FunctionHasOutOnlyParameter is a helper function which takes - * a function Oid and returns true if the input function has at least - * one OUT parameter. - */ -static bool -FunctionHasOutOnlyParameter(Oid functionOid) -{ - Oid *argTypes = NULL; - char **argNames = NULL; - char *argModes = NULL; - - HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); - if (!HeapTupleIsValid(proctup)) - { - elog(ERROR, "cache lookup failed for function %u", functionOid); - } - - int numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes); - - if (argModes == NULL) - { - /* short circuit, all arguments are IN */ - ReleaseSysCache(proctup); - - return false; - } - - int argIndex = 0; - for (; argIndex < numberOfArgs; ++argIndex) - { - if (argModes[argIndex] == PROARGMODE_OUT) - { - ReleaseSysCache(proctup); - - return true; - } - } - - ReleaseSysCache(proctup); - - return false; -} - - -#endif diff --git a/src/backend/distributed/deparser/ruleutils_12.c b/src/backend/distributed/deparser/ruleutils_12.c index 0088e6b53..bd0c752b4 100644 --- a/src/backend/distributed/deparser/ruleutils_12.c +++ b/src/backend/distributed/deparser/ruleutils_12.c @@ -460,6 +460,19 @@ pg_get_query_def(Query *query, StringInfo buffer) get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0); } +/* + * get_merged_argument_list merges both IN and OUT arguments lists into one and also + * eliminates the INOUT duplicates(present in both the lists). + */ +bool +get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList, + Oid **mergedNamedArgTypes, + List **mergedArgumentList, + int *totalArguments) +{ + /* No OUT argument support in Postgres 12 */ + return false; +} /* * pg_get_rule_expr deparses an expression and returns the result as a string. @@ -6012,6 +6025,10 @@ get_rule_expr(Node *node, deparse_context *context, get_tablefunc((TableFunc *) node, context, showimplicit); break; + case T_CallStmt: + get_func_expr(((CallStmt *) node)->funcexpr, context, showimplicit); + break; + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c index f33da22e5..dca081653 100644 --- a/src/backend/distributed/deparser/ruleutils_13.c +++ b/src/backend/distributed/deparser/ruleutils_13.c @@ -465,6 +465,19 @@ pg_get_query_def(Query *query, StringInfo buffer) get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0); } +/* + * get_merged_argument_list merges both IN and OUT arguments lists into one and also + * eliminates the INOUT duplicates(present in both the lists). + */ +bool +get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList, + Oid **mergedNamedArgTypes, + List **mergedArgumentList, + int *totalArguments) +{ + /* No OUT argument support in Postgres 13 */ + return false; +} /* * pg_get_rule_expr deparses an expression and returns the result as a string. @@ -6068,6 +6081,10 @@ get_rule_expr(Node *node, deparse_context *context, get_tablefunc((TableFunc *) node, context, showimplicit); break; + case T_CallStmt: + get_func_expr(((CallStmt *) node)->funcexpr, context, showimplicit); + break; + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index aa1fffe94..2c0fe5198 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -413,6 +413,8 @@ static bool looks_like_function(Node *node); static void get_oper_expr(OpExpr *expr, deparse_context *context); static void get_func_expr(FuncExpr *expr, deparse_context *context, bool showimplicit); +static void get_proc_expr(CallStmt *stmt, deparse_context *context, + bool showimplicit); static void get_agg_expr(Aggref *aggref, deparse_context *context, Aggref *original_aggref); static void get_agg_combine_expr(Node *node, deparse_context *context, @@ -472,7 +474,142 @@ pg_get_query_def(Query *query, StringInfo buffer) get_query_def(query, buffer, NIL, NULL, 0, WRAP_COLUMN_DEFAULT, 0); } +/* + * get_merged_argument_list merges both the IN and OUT arguments lists into one and + * also eliminates the INOUT duplicates(present in both the lists). After merging both + * the lists, it returns all the named-arguments in a list(mergedNamedArgList) along + * with their types(mergedNamedArgTypes), final argument list(mergedArgumentList), and + * the total number of arguments(totalArguments). + */ +bool +get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList, + Oid **mergedNamedArgTypes, + List **mergedArgumentList, + int *totalArguments) +{ + Oid functionOid = stmt->funcexpr->funcid; + List *namedArgList = NIL; + List *finalArgumentList = NIL; + Oid finalArgTypes[FUNC_MAX_ARGS]; + Oid *argTypes = NULL; + char *argModes = NULL; + char **argNames = NULL; + int argIndex = 0; + + HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + elog(ERROR, "cache lookup failed for function %u", functionOid); + } + + int defArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes); + ReleaseSysCache(proctup); + + if (argModes == NULL) + { + /* No OUT arguments */ + return false; + } + + /* + * Passed arguments Includes IN, OUT, INOUT (in both the lists) and VARIADIC arguments, + * which means INOUT arguments are double counted. + */ + int numberOfArgs = list_length(stmt->funcexpr->args) + list_length(stmt->outargs); + int totalInoutArgs = 0; + + /* Let's count INOUT arguments from the defined number of arguments */ + for (argIndex=0; argIndex < defArgs; ++argIndex) + { + if (argModes[argIndex] == PROARGMODE_INOUT) + totalInoutArgs++; + } + + /* Remove the duplicate INOUT counting */ + numberOfArgs = numberOfArgs - totalInoutArgs; + + ListCell *inArgCell = list_head(stmt->funcexpr->args); + ListCell *outArgCell = list_head(stmt->outargs); + + for (argIndex=0; argIndex < numberOfArgs; ++argIndex) + { + switch (argModes[argIndex]) + { + case PROARGMODE_IN: + case PROARGMODE_VARIADIC: + { + Node *arg = (Node *) lfirst(inArgCell); + + if (IsA(arg, NamedArgExpr)) + namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name); + finalArgTypes[argIndex] = exprType(arg); + finalArgumentList = lappend(finalArgumentList, arg); + inArgCell = lnext(stmt->funcexpr->args, inArgCell); + break; + } + + case PROARGMODE_OUT: + { + Node *arg = (Node *) lfirst(outArgCell); + + if (IsA(arg, NamedArgExpr)) + namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name); + finalArgTypes[argIndex] = exprType(arg); + finalArgumentList = lappend(finalArgumentList, arg); + outArgCell = lnext(stmt->outargs, outArgCell); + break; + } + + case PROARGMODE_INOUT: + { + Node *arg = (Node *) lfirst(inArgCell); + + if (IsA(arg, NamedArgExpr)) + namedArgList = lappend(namedArgList, ((NamedArgExpr *) arg)->name); + finalArgTypes[argIndex] = exprType(arg); + finalArgumentList = lappend(finalArgumentList, arg); + inArgCell = lnext(stmt->funcexpr->args, inArgCell); + outArgCell = lnext(stmt->outargs, outArgCell); + break; + } + + case PROARGMODE_TABLE: + default: + { + elog(ERROR, "Unhandled procedure argument mode[%d]", argModes[argIndex]); + break; + } + } + } + + /* + * After eliminating INOUT duplicates and merging OUT arguments, we now + * have the final list of arguments. + */ + if (defArgs != list_length(finalArgumentList)) + { + elog(ERROR, "Insufficient number of args passed[%d] for function[%s]", + list_length(finalArgumentList), + get_func_name(functionOid)); + } + + if (list_length(finalArgumentList) > FUNC_MAX_ARGS) + { + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_ARGUMENTS), + errmsg("too many arguments[%d] for function[%s]", + list_length(finalArgumentList), + get_func_name(functionOid)))); + } + + *mergedNamedArgList = namedArgList; + *mergedNamedArgTypes = finalArgTypes; + *mergedArgumentList = finalArgumentList; + *totalArguments = numberOfArgs; + + return true; +} /* * pg_get_rule_expr deparses an expression and returns the result as a string. */ @@ -6216,6 +6353,10 @@ get_rule_expr(Node *node, deparse_context *context, get_tablefunc((TableFunc *) node, context, showimplicit); break; + case T_CallStmt: + get_proc_expr((CallStmt *) node, context, showimplicit); + break; + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; @@ -6437,6 +6578,52 @@ get_func_expr(FuncExpr *expr, deparse_context *context, appendStringInfoString(buf, "VARIADIC "); get_rule_expr((Node *) lfirst(l), context, true); } + + appendStringInfoChar(buf, ')'); +} + + +/* + * get_proc_expr - Parse back a CallStmt node + */ +static void +get_proc_expr(CallStmt *stmt, deparse_context *context, + bool showimplicit) +{ + StringInfo buf = context->buf; + Oid functionOid = stmt->funcexpr->funcid; + bool use_variadic; + Oid *argumentTypes; + List *finalArgumentList = NIL; + ListCell *argumentCell; + List *namedArgList = NIL; + int numberOfArgs = -1; + + if (!get_merged_argument_list(stmt, &namedArgList, &argumentTypes, + &finalArgumentList, &numberOfArgs)) + { + /* Nothing merged i.e. no OUT arguments */ + get_func_expr((FuncExpr *) stmt->funcexpr, context, showimplicit); + return; + } + + appendStringInfo(buf, "%s(", + generate_function_name(functionOid, numberOfArgs, + namedArgList, argumentTypes, + stmt->funcexpr->funcvariadic, + &use_variadic, + context->special_exprkind)); + int argNumber = 0; + foreach(argumentCell, finalArgumentList) + { + if (argNumber++ > 0) + appendStringInfoString(buf, ", "); + if (use_variadic && lnext(finalArgumentList, argumentCell) == NULL) + appendStringInfoString(buf, "VARIADIC "); + get_rule_expr((Node *) lfirst(argumentCell), context, true); + argNumber++; + } + appendStringInfoChar(buf, ')'); } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 5e1ec0afd..83d7ad741 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -265,7 +265,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } else { - placement = ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr, + placement = ShardPlacementForFunctionColocatedWithDistTable(procedure, + funcExpr->args, partitionColumn, distTable, planContext->plan); @@ -346,19 +347,19 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) */ ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, + List *argumentList, Var *partitionColumn, CitusTableCacheEntry *cacheEntry, PlannedStmt *plan) { if (procedure->distributionArgIndex < 0 || - procedure->distributionArgIndex >= list_length(funcExpr->args)) + procedure->distributionArgIndex >= list_length(argumentList)) { ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index"))); return NULL; } - Node *partitionValueNode = (Node *) list_nth(funcExpr->args, + Node *partitionValueNode = (Node *) list_nth(argumentList, procedure->distributionArgIndex); partitionValueNode = strip_implicit_coercions(partitionValueNode); diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 946839181..f32c234ed 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -48,6 +48,9 @@ extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); +bool get_merged_argument_list(CallStmt *stmt, List **mergedNamedArgList, + Oid **mergedNamedArgTypes, List **mergedArgumentList, + int *totalArguments); char * pg_get_rule_expr(Node *expression); extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 519057ddd..74da3ad19 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -185,11 +185,10 @@ extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, extern ShardPlacement * ShardPlacementForFunctionColocatedWithReferenceTable( CitusTableCacheEntry *cacheEntry); extern ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable( - DistObjectCacheEntry *procedure, FuncExpr *funcExpr, Var *partitionColumn, + DistObjectCacheEntry *procedure, List *argumentList, Var *partitionColumn, CitusTableCacheEntry *cacheEntry, PlannedStmt *plan); - extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); extern bool CheckAvailableVersion(int elevel); diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index 9a6c40b3a..48f15d5c1 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -269,6 +269,351 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla RESET citus.multi_shard_modify_mode; -- test procedure OUT parameters with procedure pushdown +CREATE TABLE prctbl(val int primary key); +CREATE OR REPLACE PROCEDURE insert_data(arg1 integer) +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Proc with no OUT args'; +INSERT INTO pg14.prctbl VALUES (arg1); +END; +$$; +CREATE PROCEDURE insert_data_out(val integer, OUT res text) +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Proc with OUT args'; +INSERT INTO pg14.prctbl VALUES (val); +res := 'insert_data_out():proc-result'::text; +END +$$; +CREATE FUNCTION insert_data_out_fn(val integer, OUT res text) +RETURNS TEXT +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Func with OUT args'; +INSERT INTO pg14.prctbl VALUES (val); +res := 'insert_data_out_fn():func-result'::text; +END; +$$; +CREATE OR REPLACE PROCEDURE proc_varargs( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; +CREATE OR REPLACE PROCEDURE proc_varargs_inout( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; +-- Named arguments +CREATE OR REPLACE PROCEDURE proc_namedargs( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; total := 999; average := 99; +END; $$ +LANGUAGE plpgsql; +-- Mix of IN, OUT, INOUT and Variadic +CREATE OR REPLACE PROCEDURE proc_namedargs_var( + IN inp1 INT, + IN inp2 INT, + INOUT inout1 TEXT, + OUT out1 INT, + VARIADIC list INT[]) +AS $$ +DECLARE sum INT; +BEGIN +out1 := 5; +SELECT INTO sum SUM(list[i]) +FROM generate_subscripts(list, 1) g(i); +RAISE NOTICE 'Input-1: % Input-2: % VarSum: %', inp1, inp2, sum; +SELECT 'Final : ' || inout1 INTO inout1; +END; $$ +LANGUAGE plpgsql; +CREATE OR REPLACE PROCEDURE proc_varargs_inout2( + INOUT result TEXT, + OUT total NUMERIC, + OUT average NUMERIC, + IN inp INT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; +CREATE OR REPLACE PROCEDURE proc_varargs_inout3( + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT, + IN inp INT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; +-- Function overload +CREATE PROCEDURE proc_namedargs_overload( + IN inp INT) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed INT: %', inp; +END; $$ +LANGUAGE plpgsql; +CREATE PROCEDURE proc_namedargs_overload( + IN inp NUMERIC) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed NUMERIC: %', inp; +END; $$ +LANGUAGE plpgsql; +-- Before distribution +CALL insert_data(1); +NOTICE: Proc with no OUT args +CONTEXT: PL/pgSQL function insert_data(integer) line XX at RAISE +CALL insert_data_out(2, 'whythisarg'); +NOTICE: Proc with OUT args +CONTEXT: PL/pgSQL function insert_data_out(integer) line XX at RAISE + res +--------------------------------------------------------------------- + insert_data_out():proc-result +(1 row) + +SELECT insert_data_out_fn(3); +NOTICE: Func with OUT args +CONTEXT: PL/pgSQL function insert_data_out_fn(integer) line XX at RAISE + insert_data_out_fn +--------------------------------------------------------------------- + insert_data_out_fn():func-result +(1 row) + +-- Return the average and the sum of 2, 8, 20 +CALL proc_varargs(1, 1, 1, 2, 8, 20); + total | average +--------------------------------------------------------------------- + 30 | 10.0000000000000000 +(1 row) + +CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20); + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:Testing in/out/var arguments +(1 row) + +CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20); + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:99 +(1 row) + +CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20); + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:TEST COERCE_SQL_SYNTAX +(1 row) + +CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4); +NOTICE: IN passed: 4 +CONTEXT: PL/pgSQL function proc_namedargs(integer,text) line XX at RAISE + total | average | result +--------------------------------------------------------------------- + 999 | 99 | Final:Named args +(1 row) + +CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}'); +NOTICE: Input-1: 1 Input-2: 2 VarSum: 27 +CONTEXT: PL/pgSQL function proc_namedargs_var(integer,integer,text,integer[]) line XX at RAISE + inout1 | out1 +--------------------------------------------------------------------- + Final : INOUT third argument | 5 +(1 row) + +CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20); +NOTICE: IN passed: 5 +CONTEXT: PL/pgSQL function proc_varargs_inout2(text,integer,numeric[]) line XX at RAISE + result | total | average +--------------------------------------------------------------------- + Final:In Out | 30 | 10.0000000000000000 +(1 row) + +CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20); +NOTICE: IN passed: 6 +CONTEXT: PL/pgSQL function proc_varargs_inout3(text,integer,numeric[]) line XX at RAISE + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:In Out +(1 row) + +CALL proc_namedargs_overload(3); +NOTICE: IN passed INT: 3 +CONTEXT: PL/pgSQL function proc_namedargs_overload(integer) line XX at RAISE +CALL proc_namedargs_overload(4.0); +NOTICE: IN passed NUMERIC: 4.0 +CONTEXT: PL/pgSQL function proc_namedargs_overload(numeric) line XX at RAISE +CALL proc_namedargs_overload(inp=>5); +NOTICE: IN passed INT: 5 +CONTEXT: PL/pgSQL function proc_namedargs_overload(integer) line XX at RAISE +CALL proc_namedargs_overload(inp=>6.0); +NOTICE: IN passed NUMERIC: 6.0 +CONTEXT: PL/pgSQL function proc_namedargs_overload(numeric) line XX at RAISE +-- Distribute the table, procedure and function +SELECT create_distributed_table('prctbl', 'val', colocate_with := 'none'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.prctbl$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'insert_data(int)', 'arg1', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'insert_data_out(int)', 'val', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'insert_data_out_fn(int)', 'val', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_varargs(int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_varargs_inout(int, text, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_namedargs(int, text)', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_namedargs_var(int, int, text, int[])', 'inp1', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_varargs_inout2(text, int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_varargs_inout3(text, int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + + SELECT create_distributed_function( + 'proc_namedargs_overload(int)', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'proc_namedargs_overload(numeric)', 'inp', + colocate_with := 'prctbl' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + CREATE TABLE test_proc_table (a int); create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text) language plpgsql @@ -296,22 +641,6 @@ CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); {2} | 2 (1 row) -SELECT create_distributed_table('test_proc_table', 'a'); -NOTICE: Copying data from local table... -NOTICE: copying the data has completed -DETAIL: The local data in the table is no longer visible, but is still on disk. -HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$) - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); - create_distributed_function ---------------------------------------------------------------------- - -(1 row) - -- make sure that metadata is synced, it may take few seconds CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void @@ -329,22 +658,141 @@ SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port t (1 row) --- still, we do not pushdown procedures with OUT parameters +SELECT create_distributed_table('test_proc_table', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- pushdown procedures with OUT parameters SET client_min_messages TO DEBUG1; CALL proc_pushdown(1, NULL, NULL); -DEBUG: not pushing down procedures with OUT parameters +DEBUG: pushing down the procedure created | res_out --------------------------------------------------------------------- {3} | 3 (1 row) CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); -DEBUG: not pushing down procedures with OUT parameters +DEBUG: pushing down the procedure created | res_out --------------------------------------------------------------------- {4} | 4 (1 row) +CALL insert_data(4); +DEBUG: pushing down the procedure +NOTICE: Proc with no OUT args +DETAIL: from localhost:xxxxx +CALL insert_data_out(5, 'whythisarg'); +DEBUG: pushing down the procedure +NOTICE: Proc with OUT args +DETAIL: from localhost:xxxxx + res +--------------------------------------------------------------------- + insert_data_out():proc-result +(1 row) + +SELECT insert_data_out_fn(6); +DEBUG: pushing down the function call +NOTICE: Func with OUT args +DETAIL: from localhost:xxxxx + insert_data_out_fn +--------------------------------------------------------------------- + insert_data_out_fn():func-result +(1 row) + +-- Return the average and the sum of 2, 8, 20 +CALL proc_varargs(1, 1, 1, 2, 8, 20); +DEBUG: pushing down the procedure + total | average +--------------------------------------------------------------------- + 30 | 10.0000000000000000 +(1 row) + +CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20); +DEBUG: pushing down the procedure + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:Testing in/out/var arguments +(1 row) + +CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20); +DEBUG: pushing down the procedure + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:99 +(1 row) + +CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20); +DEBUG: pushing down the procedure + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:TEST COERCE_SQL_SYNTAX +(1 row) + +CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4); +DEBUG: pushing down the procedure +NOTICE: IN passed: 4 +DETAIL: from localhost:xxxxx + total | average | result +--------------------------------------------------------------------- + 999 | 99 | Final:Named args +(1 row) + +CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}'); +DEBUG: pushing down the procedure +NOTICE: Input-1: 1 Input-2: 2 VarSum: 27 +DETAIL: from localhost:xxxxx + inout1 | out1 +--------------------------------------------------------------------- + Final : INOUT third argument | 5 +(1 row) + +CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20); +DEBUG: pushing down the procedure +NOTICE: IN passed: 5 +DETAIL: from localhost:xxxxx + result | total | average +--------------------------------------------------------------------- + Final:In Out | 30 | 10.0000000000000000 +(1 row) + +CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20); +DEBUG: pushing down the procedure +NOTICE: IN passed: 6 +DETAIL: from localhost:xxxxx + total | average | result +--------------------------------------------------------------------- + 30 | 10.0000000000000000 | Final:In Out +(1 row) + +CALL proc_namedargs_overload(3); +DEBUG: pushing down the procedure +NOTICE: IN passed INT: 3 +DETAIL: from localhost:xxxxx +CALL proc_namedargs_overload(4.0); +DEBUG: pushing down the procedure +NOTICE: IN passed NUMERIC: 4.0 +DETAIL: from localhost:xxxxx +CALL proc_namedargs_overload(inp=>5); +DEBUG: pushing down the procedure +NOTICE: IN passed INT: 5 +DETAIL: from localhost:xxxxx +CALL proc_namedargs_overload(inp=>6.0); +DEBUG: pushing down the procedure +NOTICE: IN passed NUMERIC: 6.0 +DETAIL: from localhost:xxxxx RESET client_min_messages; -- we don't need metadata syncing anymore SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index cbb1d755a..62d2eb61d 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -105,6 +105,242 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla RESET citus.multi_shard_modify_mode; -- test procedure OUT parameters with procedure pushdown +CREATE TABLE prctbl(val int primary key); + +CREATE OR REPLACE PROCEDURE insert_data(arg1 integer) +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Proc with no OUT args'; +INSERT INTO pg14.prctbl VALUES (arg1); +END; +$$; + +CREATE PROCEDURE insert_data_out(val integer, OUT res text) +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Proc with OUT args'; +INSERT INTO pg14.prctbl VALUES (val); +res := 'insert_data_out():proc-result'::text; +END +$$; + +CREATE FUNCTION insert_data_out_fn(val integer, OUT res text) +RETURNS TEXT +LANGUAGE PLPGSQL +AS $$ +BEGIN +RAISE NOTICE 'Func with OUT args'; +INSERT INTO pg14.prctbl VALUES (val); +res := 'insert_data_out_fn():func-result'::text; +END; +$$; + +CREATE OR REPLACE PROCEDURE proc_varargs( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE proc_varargs_inout( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; + +-- Named arguments +CREATE OR REPLACE PROCEDURE proc_namedargs( + IN inp INT, + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; total := 999; average := 99; +END; $$ +LANGUAGE plpgsql; + +-- Mix of IN, OUT, INOUT and Variadic +CREATE OR REPLACE PROCEDURE proc_namedargs_var( + IN inp1 INT, + IN inp2 INT, + INOUT inout1 TEXT, + OUT out1 INT, + VARIADIC list INT[]) +AS $$ +DECLARE sum INT; +BEGIN +out1 := 5; +SELECT INTO sum SUM(list[i]) +FROM generate_subscripts(list, 1) g(i); +RAISE NOTICE 'Input-1: % Input-2: % VarSum: %', inp1, inp2, sum; +SELECT 'Final : ' || inout1 INTO inout1; +END; $$ +LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE proc_varargs_inout2( + INOUT result TEXT, + OUT total NUMERIC, + OUT average NUMERIC, + IN inp INT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE proc_varargs_inout3( + OUT total NUMERIC, + OUT average NUMERIC, + INOUT result TEXT, + IN inp INT, + VARIADIC list NUMERIC[]) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed: %', inp; + SELECT 'Final:' || result INTO result; + SELECT INTO total SUM(list[i]) + FROM generate_subscripts(list, 1) g(i); + + SELECT INTO average AVG(list[i]) + FROM generate_subscripts(list, 1) g(i); + +END; $$ +LANGUAGE plpgsql; + +-- Function overload + +CREATE PROCEDURE proc_namedargs_overload( + IN inp INT) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed INT: %', inp; +END; $$ +LANGUAGE plpgsql; + +CREATE PROCEDURE proc_namedargs_overload( + IN inp NUMERIC) +AS $$ +BEGIN + + RAISE NOTICE 'IN passed NUMERIC: %', inp; +END; $$ +LANGUAGE plpgsql; + + +-- Before distribution +CALL insert_data(1); +CALL insert_data_out(2, 'whythisarg'); +SELECT insert_data_out_fn(3); +-- Return the average and the sum of 2, 8, 20 +CALL proc_varargs(1, 1, 1, 2, 8, 20); +CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20); +CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20); +CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20); +CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4); +CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}'); +CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20); +CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20); +CALL proc_namedargs_overload(3); +CALL proc_namedargs_overload(4.0); +CALL proc_namedargs_overload(inp=>5); +CALL proc_namedargs_overload(inp=>6.0); + +-- Distribute the table, procedure and function +SELECT create_distributed_table('prctbl', 'val', colocate_with := 'none'); + +SELECT create_distributed_function( + 'insert_data(int)', 'arg1', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'insert_data_out(int)', 'val', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'insert_data_out_fn(int)', 'val', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_varargs(int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_varargs_inout(int, text, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_namedargs(int, text)', 'inp', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_namedargs_var(int, int, text, int[])', 'inp1', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_varargs_inout2(text, int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_varargs_inout3(text, int, NUMERIC[])', 'inp', + colocate_with := 'prctbl' +); + + SELECT create_distributed_function( + 'proc_namedargs_overload(int)', 'inp', + colocate_with := 'prctbl' +); + +SELECT create_distributed_function( + 'proc_namedargs_overload(numeric)', 'inp', + colocate_with := 'prctbl' +); + CREATE TABLE test_proc_table (a int); create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text) @@ -125,9 +361,6 @@ end;$$; CALL proc_pushdown(1, NULL, NULL); CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); -SELECT create_distributed_table('test_proc_table', 'a'); -SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); - -- make sure that metadata is synced, it may take few seconds CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void @@ -136,10 +369,29 @@ CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 1500 SELECT wait_until_metadata_sync(30000); SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port); --- still, we do not pushdown procedures with OUT parameters +SELECT create_distributed_table('test_proc_table', 'a'); +SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); + +-- pushdown procedures with OUT parameters SET client_min_messages TO DEBUG1; CALL proc_pushdown(1, NULL, NULL); CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); +CALL insert_data(4); +CALL insert_data_out(5, 'whythisarg'); +SELECT insert_data_out_fn(6); +-- Return the average and the sum of 2, 8, 20 +CALL proc_varargs(1, 1, 1, 2, 8, 20); +CALL proc_varargs_inout(1, 1, 1, 'Testing in/out/var arguments'::text, 2, 8, 20); +CALL proc_varargs_inout(2, 1, 1, to_char(99,'FM99'), 2, 8, 20); +CALL proc_varargs_inout(3, 1, 1, TRIM( BOTH FROM ' TEST COERCE_SQL_SYNTAX '), 2, 8, 20); +CALL proc_namedargs(total=>3, result=>'Named args'::text, average=>2::NUMERIC, inp=>4); +CALL proc_namedargs_var(inout1=> 'INOUT third argument'::text, out1=>4, inp2=>2, inp1=>1, variadic list=>'{9, 9, 9}'); +CALL proc_varargs_inout2('In Out', 1, 1, 5, 2, 8, 20); +CALL proc_varargs_inout3(1, 1, 'In Out', 6, 2, 8, 20); +CALL proc_namedargs_overload(3); +CALL proc_namedargs_overload(4.0); +CALL proc_namedargs_overload(inp=>5); +CALL proc_namedargs_overload(inp=>6.0); RESET client_min_messages; -- we don't need metadata syncing anymore