Support OUT parameters in procedure pushdown delegation

In PG 14, procedures can have OUT parameters. In Citus' procedure
delegation framework, we need to adjust the function expression
to get the outargs parameters.

Releven PG change:
e56bce5d43
pull/5209/head
Onder Kalaci 2021-09-01 14:35:41 +02:00 committed by Sait Talha Nisanci
parent 1ff7186d20
commit 5844ab286c
4 changed files with 212 additions and 2 deletions

View File

@ -11,6 +11,9 @@
*/
#include "postgres.h"
#include "funcapi.h"
#include "distributed/pg_version_constants.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
@ -46,6 +49,9 @@
static bool CallFuncExprRemotely(CallStmt *callStmt,
DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, DestReceiver *dest);
#if PG_VERSION_NUM >= PG_VERSION_14
static bool FunctionHasOutOnlyParameter(Oid functionOid);
#endif
/*
* CallDistributedProcedureRemotely calls a stored procedure on the worker if possible.
@ -142,12 +148,26 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_14
/*
* We might need to add outargs to the funcExpr->args so that they can
* be pushed down. We can implement in the future.
*/
if (FunctionHasOutOnlyParameter(funcExpr->funcid))
{
ereport(DEBUG1, (errmsg("not pushing down procedures with OUT parameters")));
return false;
}
#endif
ereport(DEBUG1, (errmsg("pushing down the procedure")));
/* build remote command with fully qualified names */
StringInfo callCommand = makeStringInfo();
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr));
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr));
{
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
@ -207,3 +227,53 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return true;
}
#if PG_VERSION_NUM >= PG_VERSION_14
/*
* FunctionHasOutOnlyParameter is a helper function which takes
* a function Oid and returns true if the input function has at least
* one OUT parameter.
*/
static bool
FunctionHasOutOnlyParameter(Oid functionOid)
{
Oid *argTypes = NULL;
char **argNames = NULL;
char *argModes = NULL;
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for function %u", functionOid);
}
int numberOfArgs = get_func_arg_info(proctup, &argTypes, &argNames, &argModes);
if (argModes == NULL)
{
/* short circuit, all arguments are IN */
ReleaseSysCache(proctup);
return false;
}
int argIndex = 0;
for (; argIndex < numberOfArgs; ++argIndex)
{
if (argModes[argIndex] == PROARGMODE_OUT)
{
ReleaseSysCache(proctup);
return true;
}
}
ReleaseSysCache(proctup);
return false;
}
#endif

View File

@ -226,5 +226,99 @@ SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regcla
{"a p","a p","b ","b "}
(2 rows)
RESET citus.multi_shard_modify_mode;
-- test procedure OUT parameters with procedure pushdown
CREATE TABLE test_proc_table (a int);
create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text)
language plpgsql
as $$
DECLARE
res INT := 0;
begin
INSERT INTO pg14.test_proc_table VALUES (dist_key);
SELECT count(*) INTO res FROM pg14.test_proc_table;
created := created || res;
PERFORM array_prepend(res, created);
res_out := res::text;
commit;
end;$$;
-- show the behaviour before distributing
CALL proc_pushdown(1, NULL, NULL);
created | res_out
---------------------------------------------------------------------
{1} | 1
(1 row)
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
created | res_out
---------------------------------------------------------------------
{2} | 2
(1 row)
SELECT create_distributed_table('test_proc_table', 'a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg14.test_proc_table$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- make sure that metadata is synced, it may take few seconds
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port);
bool_and
---------------------------------------------------------------------
t
(1 row)
-- still, we do not pushdown procedures with OUT parameters
SET client_min_messages TO DEBUG1;
CALL proc_pushdown(1, NULL, NULL);
DEBUG: not pushing down procedures with OUT parameters
created | res_out
---------------------------------------------------------------------
{3} | 3
(1 row)
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
DEBUG: not pushing down procedures with OUT parameters
created | res_out
---------------------------------------------------------------------
{4} | 4
(1 row)
RESET client_min_messages;
-- we don't need metadata syncing anymore
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
set client_min_messages to error;
drop schema pg14 cascade;

View File

@ -52,7 +52,9 @@ test: subquery_in_targetlist subquery_in_where subquery_complex_target_list
test: subquery_prepared_statements
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
test: cte_inline recursive_view_local_table values
test: pg13 pg12 pg14
test: pg13 pg12
# run pg14 sequentially as it syncs metadata
test: pg14
test: tableam drop_column_partitioned_table
# ----------

View File

@ -79,5 +79,49 @@ SELECT result AS column_compression FROM run_command_on_workers($$SELECT ARRAY(
SELECT attname || ' ' || attcompression FROM pg_attribute WHERE attrelid::regclass::text LIKE 'pg14.col\_compression%' AND attnum > 0 ORDER BY 1
)$$) ORDER BY length(result);
RESET citus.multi_shard_modify_mode;
-- test procedure OUT parameters with procedure pushdown
CREATE TABLE test_proc_table (a int);
create or replace procedure proc_pushdown(dist_key integer, OUT created int4[], OUT res_out text)
language plpgsql
as $$
DECLARE
res INT := 0;
begin
INSERT INTO pg14.test_proc_table VALUES (dist_key);
SELECT count(*) INTO res FROM pg14.test_proc_table;
created := created || res;
PERFORM array_prepend(res, created);
res_out := res::text;
commit;
end;$$;
-- show the behaviour before distributing
CALL proc_pushdown(1, NULL, NULL);
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
SELECT create_distributed_table('test_proc_table', 'a');
SELECT create_distributed_function('proc_pushdown(integer)', 'dist_key', 'test_proc_table' );
-- make sure that metadata is synced, it may take few seconds
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
SELECT wait_until_metadata_sync(30000);
SELECT bool_and(hasmetadata) FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port);
-- still, we do not pushdown procedures with OUT parameters
SET client_min_messages TO DEBUG1;
CALL proc_pushdown(1, NULL, NULL);
CALL proc_pushdown(1, ARRAY[2000,1], 'AAAA');
RESET client_min_messages;
-- we don't need metadata syncing anymore
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
set client_min_messages to error;
drop schema pg14 cascade;