diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 1f0abbb56..10f58a111 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3585,7 +3585,7 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } - EnsureTaskExecutionAllowed(hasRemoteCopy); + EnsureTaskExecutionAllowed(hasRemoteCopy, true); /* * We just error out and code execution should never reach to this diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 895f01ae7..50b5104a5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1384,7 +1384,7 @@ StartDistributedExecution(DistributedExecution *execution) if (execution->remoteTaskList != NIL) { bool isRemote = true; - EnsureTaskExecutionAllowed(isRemote); + EnsureTaskExecutionAllowed(isRemote, true); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 18563c763..3357fdfd4 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -230,7 +230,24 @@ ExecuteLocalTaskListExtended(List *taskList, if (taskList != NIL) { bool isRemote = false; - EnsureTaskExecutionAllowed(isRemote); + if (!EnsureTaskExecutionAllowed(isRemote, false)) + { + /* instead of erroring, let's check further */ + Task *task = NULL; + foreach_ptr(task, taskList) + { + if (!task->safeToPush) + ereport(ERROR, + (errmsg("cannot execute a distributed query from a query on a " + "shard"), + errdetail("Executing a distributed query in a function call that " + "may be pushed to a remote node can lead to incorrect " + "results."), + errhint("Avoid nesting of distributed queries or use alter user " + "current_user set citus.allow_nested_distributed_execution " + "to on to allow it with possible incorrectness."))); + } + } } /* diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index e257b80c6..790382ae9 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -899,15 +899,17 @@ ExecutorBoundParams(void) * a function in a query that gets pushed down to the worker, and the * function performs a query on a distributed table. */ -void -EnsureTaskExecutionAllowed(bool isRemote) +bool +EnsureTaskExecutionAllowed(bool isRemote, bool shouldError) { if (IsTaskExecutionAllowed(isRemote)) { - return; + return true; } - ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " + if (shouldError) + ereport(ERROR, + (errmsg("cannot execute a distributed query from a query on a " "shard"), errdetail("Executing a distributed query in a function call that " "may be pushed to a remote node can lead to incorrect " @@ -915,6 +917,8 @@ EnsureTaskExecutionAllowed(bool isRemote) errhint("Avoid nesting of distributed queries or use alter user " "current_user set citus.allow_nested_distributed_execution " "to on to allow it with possible incorrectness."))); + + return false; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 19d386343..63c634764 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1792,6 +1792,8 @@ CreateTask(TaskType taskType) task->partiallyLocalOrRemote = false; task->relationShardList = NIL; + task->safeToPush = false; + return task; } @@ -2176,6 +2178,56 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, task->replicationModel = replicationModel; task->parametersInQueryStringResolved = parametersInQueryResolved; + StringInfo sqlQueryString = makeStringInfo(); + pg_get_query_def(query, sqlQueryString); + /* log the query string we generated */ + ereport(DEBUG4, (errmsg("generated sql query for task %d", task->taskId), + errdetail("query string: \"%s\"", + sqlQueryString->data))); + + // if (query->hasTargetSRFs) + // if (query->rtable) + // if (query->jointree) + // if (query->targetList) + // if (query->returningList) + /* Check the target list */ + // task->safeToPush = true; + // ListCell *lc; + // bool foundUDF = false; + // foreach (lc, query->targetList) + // { + // TargetEntry *tle = (TargetEntry *) lfirst(lc); + // elog(DEBUG2, "walking target list"); + // if (ContainsUDFWalker((Node *) tle->expr, &foundUDF)) + // { + // task->safeToPush = false; + // elog(DEBUG2, "UNSAFE"); + // // break; + // } + // } + // + /* quick check first */ + // if (colocationId) //FIXME include header for INVALID_COLOCATION_ID ? + // { + // goto exitnow; + // } + + ListCell *lc; + bool foundNonReferenceTable = false; + foreach (lc, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(lc); + // elog(DEBUG2, "walking relation shard list"); + if (!IsCitusTableType(relationShard->relationId, REFERENCE_TABLE)) + { + foundNonReferenceTable = true; + // elog(DEBUG2, "found UNSAFE"); + } + } + + if (!foundNonReferenceTable) + task->safeToPush = true; + return list_make1(task); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 4b4a334c8..6f8f0f650 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -327,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(isLocalTableModification); COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); + COPY_SCALAR_FIELD(safeToPush); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 751063789..b2ecea62a 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -536,6 +536,7 @@ OutTask(OUTFUNC_ARGS) WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_BOOL_FIELD(isLocalTableModification); WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); + WRITE_BOOL_FIELD(safeToPush); } diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6708d9a64..786e0c7be 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -153,7 +153,7 @@ extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern ParamListInfo ExecutorBoundParams(void); -extern void EnsureTaskExecutionAllowed(bool isRemote); +extern bool EnsureTaskExecutionAllowed(bool isRemote, bool shouldError); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..88220c02d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -334,6 +334,9 @@ typedef struct Task Const *partitionKeyValue; int colocationId; + + /* if it's granted this task nested statements are safe to be executed */ + bool safeToPush; } Task; diff --git a/src/test/regress/expected/functions_pushdown.out b/src/test/regress/expected/functions_pushdown.out new file mode 100644 index 000000000..00cdaf5a6 --- /dev/null +++ b/src/test/regress/expected/functions_pushdown.out @@ -0,0 +1,331 @@ +create schema functions_pushdown; +set search_path to functions_pushdown; +set citus.shard_replication_factor TO 1; +create table reference_table( id bigint primary key, t text); +select create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +create table distributed_table( id bigint primary key + , fk_id bigint REFERENCES reference_table (id) + , t text +); +select create_distributed_table('distributed_table', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE FUNCTION reference_function_sql(i_id bigint) +RETURNS bigint LANGUAGE sql AS +$function$ +SELECT count(*) FROM functions_pushdown.reference_table where id = i_id; +$function$; +CREATE FUNCTION reference_function_plpgsql(i_id bigint) +RETURNS bigint LANGUAGE plpgsql AS +$function$ +DECLARE + result bigint; -- Variable to hold the count +BEGIN + -- Fetch the count into the variable + SELECT count(*) INTO result + FROM functions_pushdown.reference_table + WHERE id = i_id; + -- Return the result + RETURN result; +END; +$function$; +CREATE FUNCTION distributed_function_sql(i_id bigint) +RETURNS bigint LANGUAGE sql AS +$function$ +SELECT count(*) FROM functions_pushdown.distributed_table where id = i_id; +$function$; +CREATE FUNCTION distributed_function_plpgsql(i_id bigint) +RETURNS bigint LANGUAGE plpgsql AS +$function$ +DECLARE + result bigint; -- Variable to hold the count +BEGIN + -- Fetch the count into the variable + SELECT count(*) INTO result + FROM functions_pushdown.distributed_table + WHERE id = i_id; + -- Return the result + RETURN result; +END; +$function$; +select create_distributed_function('distributed_function_sql(bigint)', '$1' + , colocate_with := 'distributed_table'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_function('distributed_function_plpgsql(bigint)', '$1' + , colocate_with := 'distributed_table'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +insert into reference_table values (1, 'a'); +insert into reference_table values (2, 'b'); +insert into reference_table values (3, 'c'); +insert into distributed_table values (1, 1, 'aa'); +insert into distributed_table values (2, 2, 'bb'); +insert into distributed_table values (3, 3, 'cc'); +insert into distributed_table values (4, 2, 'BB'); +-- REFERENCE +select *,reference_function_sql(id) from reference_table order by id; + id | t | reference_function_sql +--------------------------------------------------------------------- + 1 | a | 1 + 2 | b | 1 + 3 | c | 1 +(3 rows) + +select *,reference_function_sql(id) from reference_table where id = 1; + id | t | reference_function_sql +--------------------------------------------------------------------- + 1 | a | 1 +(1 row) + +select *,reference_function_plpgsql(id) from reference_table order by id; + id | t | reference_function_plpgsql +--------------------------------------------------------------------- + 1 | a | 1 + 2 | b | 1 + 3 | c | 1 +(3 rows) + +select *,reference_function_plpgsql(id) from reference_table where id = 1; + id | t | reference_function_plpgsql +--------------------------------------------------------------------- + 1 | a | 1 +(1 row) + +select *,reference_function_sql(id) from distributed_table order by id; + id | fk_id | t | reference_function_sql +--------------------------------------------------------------------- + 1 | 1 | aa | 1 + 2 | 2 | bb | 1 + 3 | 3 | cc | 1 + 4 | 2 | BB | 0 +(4 rows) + +select *,reference_function_sql(id) from distributed_table where id = 1; + id | fk_id | t | reference_function_sql +--------------------------------------------------------------------- + 1 | 1 | aa | 1 +(1 row) + +select *,reference_function_sql(id) from distributed_table where id = 4; + id | fk_id | t | reference_function_sql +--------------------------------------------------------------------- + 4 | 2 | BB | 0 +(1 row) + +select *,reference_function_plpgsql(id) from distributed_table order by id; + id | fk_id | t | reference_function_plpgsql +--------------------------------------------------------------------- + 1 | 1 | aa | 1 + 2 | 2 | bb | 1 + 3 | 3 | cc | 1 + 4 | 2 | BB | 0 +(4 rows) + +select *,reference_function_plpgsql(id) from distributed_table where id = 1; + id | fk_id | t | reference_function_plpgsql +--------------------------------------------------------------------- + 1 | 1 | aa | 1 +(1 row) + +select *,reference_function_plpgsql(id) from distributed_table where id = 4; + id | fk_id | t | reference_function_plpgsql +--------------------------------------------------------------------- + 4 | 2 | BB | 0 +(1 row) + +-- DISTRIBUTE (not supported yet) +select *,distributed_function_sql(id) from reference_table order by id; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_sql(id) from reference_table where id = 1; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_plpgsql(id) from reference_table order by id; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_plpgsql(id) from reference_table where id = 1; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_sql(id) from distributed_table order by id; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_sql(id) from distributed_table where id = 1; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "distributed_function_sql" statement 1 +while executing command on localhost:xxxxx +select *,distributed_function_plpgsql(id) from distributed_table order by id; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL statement "SELECT count(*) FROM functions_pushdown.distributed_table + WHERE id = i_id" +PL/pgSQL function functions_pushdown.distributed_function_plpgsql(bigint) line XX at SQL statement +while executing command on localhost:xxxxx +select *,distributed_function_plpgsql(id) from distributed_table where id = 1; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL statement "SELECT count(*) FROM functions_pushdown.distributed_table + WHERE id = i_id" +PL/pgSQL function functions_pushdown.distributed_function_plpgsql(bigint) line XX at SQL statement +while executing command on localhost:xxxxx +-- some other checks (all should pass) +select reference_function_sql(1); + reference_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_sql(2); + reference_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_sql(3); + reference_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_sql(4); + reference_function_sql +--------------------------------------------------------------------- + 0 +(1 row) + +select reference_function_plpgsql(1); + reference_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_plpgsql(2); + reference_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_plpgsql(3); + reference_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select reference_function_plpgsql(4); + reference_function_plpgsql +--------------------------------------------------------------------- + 0 +(1 row) + +select distributed_function_sql(1); + distributed_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_sql(2); + distributed_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_sql(3); + distributed_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_sql(4); + distributed_function_sql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_plpgsql(1); + distributed_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_plpgsql(2); + distributed_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_plpgsql(3); + distributed_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +select distributed_function_plpgsql(4); + distributed_function_plpgsql +--------------------------------------------------------------------- + 1 +(1 row) + +-- https://github.com/citusdata/citus/issues/5887 +CREATE TABLE functions_pushdown.test(a int); +SELECT create_distributed_table('functions_pushdown.test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO functions_pushdown.test SELECT i FROM generate_series(1,10)i; +CREATE OR REPLACE FUNCTION some_func() RETURNS integer + AS 'select count(*) FROM functions_pushdown.test;' + LANGUAGE SQL + VOLATILE + RETURNS NULL ON NULL INPUT; +SELECT some_func() FROM functions_pushdown.test; +ERROR: cannot execute a distributed query from a query on a shard +DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results. +HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness. +CONTEXT: SQL function "some_func" statement 1 +while executing command on localhost:xxxxx +drop table distributed_table; +drop table reference_table; +drop schema functions_pushdown cascade; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to function reference_function_sql(bigint) +drop cascades to function reference_function_plpgsql(bigint) +drop cascades to function distributed_function_sql(bigint) +drop cascades to function distributed_function_plpgsql(bigint) +drop cascades to table test +drop cascades to function some_func() +set citus.shard_replication_factor TO 2; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 2ce74e9a7..8f4e45ca2 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -375,6 +375,7 @@ test: distributed_collations_conflict test: function_propagation test: view_propagation test: check_mx +test: functions_pushdown # --------- # deparsing logic tests diff --git a/src/test/regress/sql/functions_pushdown.sql b/src/test/regress/sql/functions_pushdown.sql new file mode 100644 index 000000000..4554571d2 --- /dev/null +++ b/src/test/regress/sql/functions_pushdown.sql @@ -0,0 +1,135 @@ +create schema functions_pushdown; +set search_path to functions_pushdown; +set citus.shard_replication_factor TO 1; + +create table reference_table( id bigint primary key, t text); +select create_reference_table('reference_table'); + +create table distributed_table( id bigint primary key + , fk_id bigint REFERENCES reference_table (id) + , t text +); +select create_distributed_table('distributed_table', 'id'); + +CREATE FUNCTION reference_function_sql(i_id bigint) +RETURNS bigint LANGUAGE sql AS +$function$ +SELECT count(*) FROM functions_pushdown.reference_table where id = i_id; +$function$; + +CREATE FUNCTION reference_function_plpgsql(i_id bigint) +RETURNS bigint LANGUAGE plpgsql AS +$function$ +DECLARE + result bigint; -- Variable to hold the count +BEGIN + -- Fetch the count into the variable + SELECT count(*) INTO result + FROM functions_pushdown.reference_table + WHERE id = i_id; + -- Return the result + RETURN result; +END; +$function$; + +CREATE FUNCTION distributed_function_sql(i_id bigint) +RETURNS bigint LANGUAGE sql AS +$function$ +SELECT count(*) FROM functions_pushdown.distributed_table where id = i_id; +$function$; + +CREATE FUNCTION distributed_function_plpgsql(i_id bigint) +RETURNS bigint LANGUAGE plpgsql AS +$function$ +DECLARE + result bigint; -- Variable to hold the count +BEGIN + -- Fetch the count into the variable + SELECT count(*) INTO result + FROM functions_pushdown.distributed_table + WHERE id = i_id; + -- Return the result + RETURN result; +END; +$function$; + +select create_distributed_function('distributed_function_sql(bigint)', '$1' + , colocate_with := 'distributed_table'); +select create_distributed_function('distributed_function_plpgsql(bigint)', '$1' + , colocate_with := 'distributed_table'); + +insert into reference_table values (1, 'a'); +insert into reference_table values (2, 'b'); +insert into reference_table values (3, 'c'); + +insert into distributed_table values (1, 1, 'aa'); +insert into distributed_table values (2, 2, 'bb'); +insert into distributed_table values (3, 3, 'cc'); +insert into distributed_table values (4, 2, 'BB'); + +-- REFERENCE +select *,reference_function_sql(id) from reference_table order by id; +select *,reference_function_sql(id) from reference_table where id = 1; + +select *,reference_function_plpgsql(id) from reference_table order by id; +select *,reference_function_plpgsql(id) from reference_table where id = 1; + +select *,reference_function_sql(id) from distributed_table order by id; +select *,reference_function_sql(id) from distributed_table where id = 1; +select *,reference_function_sql(id) from distributed_table where id = 4; + +select *,reference_function_plpgsql(id) from distributed_table order by id; +select *,reference_function_plpgsql(id) from distributed_table where id = 1; +select *,reference_function_plpgsql(id) from distributed_table where id = 4; + +-- DISTRIBUTE (not supported yet) +select *,distributed_function_sql(id) from reference_table order by id; +select *,distributed_function_sql(id) from reference_table where id = 1; + +select *,distributed_function_plpgsql(id) from reference_table order by id; +select *,distributed_function_plpgsql(id) from reference_table where id = 1; + +select *,distributed_function_sql(id) from distributed_table order by id; +select *,distributed_function_sql(id) from distributed_table where id = 1; + +select *,distributed_function_plpgsql(id) from distributed_table order by id; +select *,distributed_function_plpgsql(id) from distributed_table where id = 1; + +-- some other checks (all should pass) +select reference_function_sql(1); +select reference_function_sql(2); +select reference_function_sql(3); +select reference_function_sql(4); + +select reference_function_plpgsql(1); +select reference_function_plpgsql(2); +select reference_function_plpgsql(3); +select reference_function_plpgsql(4); + +select distributed_function_sql(1); +select distributed_function_sql(2); +select distributed_function_sql(3); +select distributed_function_sql(4); + +select distributed_function_plpgsql(1); +select distributed_function_plpgsql(2); +select distributed_function_plpgsql(3); +select distributed_function_plpgsql(4); + +-- https://github.com/citusdata/citus/issues/5887 +CREATE TABLE functions_pushdown.test(a int); +SELECT create_distributed_table('functions_pushdown.test', 'a'); +INSERT INTO functions_pushdown.test SELECT i FROM generate_series(1,10)i; + +CREATE OR REPLACE FUNCTION some_func() RETURNS integer + AS 'select count(*) FROM functions_pushdown.test;' + LANGUAGE SQL + VOLATILE + RETURNS NULL ON NULL INPUT; + +SELECT some_func() FROM functions_pushdown.test; + +drop table distributed_table; +drop table reference_table; +drop schema functions_pushdown cascade; +set citus.shard_replication_factor TO 2;