diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index ff547f1cb..98add3f28 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -11,6 +11,9 @@ */ #include "postgres.h" +#include "funcapi.h" + +#include "distributed/pg_version_constants.h" #include "catalog/pg_proc.h" #include "commands/defrem.h" @@ -46,6 +49,9 @@ static bool CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, FuncExpr *funcExpr, DestReceiver *dest); +#if PG_VERSION_NUM >= PG_VERSION_14 +static bool FunctionHasOutOnlyParameter(Oid functionOid); +#endif /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. @@ -142,12 +148,26 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return false; } + +#if PG_VERSION_NUM >= PG_VERSION_14 + + /* + * We might need to add outargs to the funcExpr->args so that they can + * be pushed down. We can implement in the future. + */ + if (FunctionHasOutOnlyParameter(funcExpr->funcid)) + { + ereport(DEBUG1, (errmsg("not pushing down procedures with OUT parameters"))); + return false; + } +#endif + ereport(DEBUG1, (errmsg("pushing down the procedure"))); /* build remote command with fully qualified names */ StringInfo callCommand = makeStringInfo(); - appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr)); + appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr)); { Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); @@ -207,3 +227,53 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return true; } + + +#if PG_VERSION_NUM >= PG_VERSION_14 + +/* + * FunctionHasOutOnlyParameter is a helper function which takes + * a function Oid and returns true if the input function has at least + * one OUT parameter. + */ +static bool +FunctionHasOutOnlyParameter(Oid functionOid) +{ + Oid *argTypes = NULL; + char **argNames = NULL; + char *argModes = NULL; + + HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + elog(ERROR, "cache lookup failed for function %u", functionOid); + } + + int numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes); + + if (argModes == NULL) + { + /* short circuit, all arguments are IN */ + ReleaseSysCache(proctup); + + return false; + } + + int argIndex = 0; + for (; argIndex < numberOfArgs; ++argIndex) + { + if (argModes[argIndex] == PROARGMODE_OUT) + { + ReleaseSysCache(proctup); + + return true; + } + } + + ReleaseSysCache(proctup); + + return false; +} + + +#endif diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index 2d7dd89c1..16fb3e8f9 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -226,5 +226,99 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla {"a p","a p","b ","b "} (2 rows) +RESET citus.multi_shard_modify_mode; +-- test procedure OUT parameters with procedure pushdown +CREATE TABLE test_proc_table (a int); +create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text) +language plpgsql +as $$ +DECLARE + res INT := 0; +begin + INSERT INTO pg14.test_proc_table VALUES (dist_key); + SELECT count(*) INTO res FROM pg14.test_proc_table; + created := created || res; + PERFORM array_prepend(res, created); + res_out := res::text; + commit; +end;$$; +-- show the behaviour before distributing +CALL proc_pushdown(1, NULL, NULL); + created | res_out +--------------------------------------------------------------------- + {1} | 1 +(1 row) + +CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); + created | res_out +--------------------------------------------------------------------- + {2} | 2 +(1 row) + +SELECT create_distributed_table('test_proc_table', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- make sure that metadata is synced, it may take few seconds +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +SELECT wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port); + bool_and +--------------------------------------------------------------------- + t +(1 row) + +-- still, we do not pushdown procedures with OUT parameters +SET client_min_messages TO DEBUG1; +CALL proc_pushdown(1, NULL, NULL); +DEBUG: not pushing down procedures with OUT parameters + created | res_out +--------------------------------------------------------------------- + {3} | 3 +(1 row) + +CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); +DEBUG: not pushing down procedures with OUT parameters + created | res_out +--------------------------------------------------------------------- + {4} | 4 +(1 row) + +RESET client_min_messages; +-- we don't need metadata syncing anymore +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +NOTICE: dropping metadata on the node (localhost,57638) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + set client_min_messages to error; drop schema pg14 cascade; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 405089a70..e5f33cdbf 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -52,7 +52,9 @@ test: subquery_in_targetlist subquery_in_where subquery_complex_target_list test: subquery_prepared_statements test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order test: cte_inline recursive_view_local_table values -test: pg13 pg12 pg14 +test: pg13 pg12 +# run pg14 sequentially as it syncs metadata +test: pg14 test: tableam drop_column_partitioned_table # ---------- diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index 66c39482f..ce36c6922 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -79,5 +79,49 @@ SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY( SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1 )$$) ORDER BY length(result); +RESET citus.multi_shard_modify_mode; + +-- test procedure OUT parameters with procedure pushdown +CREATE TABLE test_proc_table (a int); + +create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text) +language plpgsql +as $$ +DECLARE + res INT := 0; +begin + INSERT INTO pg14.test_proc_table VALUES (dist_key); + SELECT count(*) INTO res FROM pg14.test_proc_table; + created := created || res; + PERFORM array_prepend(res, created); + res_out := res::text; + commit; +end;$$; + +-- show the behaviour before distributing +CALL proc_pushdown(1, NULL, NULL); +CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); + +SELECT create_distributed_table('test_proc_table', 'a'); +SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' ); + +-- make sure that metadata is synced, it may take few seconds +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +SELECT wait_until_metadata_sync(30000); +SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port); + +-- still, we do not pushdown procedures with OUT parameters +SET client_min_messages TO DEBUG1; +CALL proc_pushdown(1, NULL, NULL); +CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA'); +RESET client_min_messages; + +-- we don't need metadata syncing anymore +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + set client_min_messages to error; drop schema pg14 cascade;