pull/7823/merge
Cédric Villemain 2025-06-21 08:55:50 +00:00 committed by GitHub
commit 3f8d88e402
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 553 additions and 8 deletions

View File

@ -3585,7 +3585,7 @@ InitializeCopyShardState(CopyShardState *shardState,
ereport(ERROR, (errmsg("could not connect to any active placements")));
}
EnsureTaskExecutionAllowed(hasRemoteCopy);
EnsureTaskExecutionAllowed(hasRemoteCopy, true);
/*
* We just error out and code execution should never reach to this

View File

@ -1384,7 +1384,7 @@ StartDistributedExecution(DistributedExecution *execution)
if (execution->remoteTaskList != NIL)
{
bool isRemote = true;
EnsureTaskExecutionAllowed(isRemote);
EnsureTaskExecutionAllowed(isRemote, true);
}
}

View File

@ -230,7 +230,24 @@ ExecuteLocalTaskListExtended(List *taskList,
if (taskList != NIL)
{
bool isRemote = false;
EnsureTaskExecutionAllowed(isRemote);
if (!EnsureTaskExecutionAllowed(isRemote, false))
{
/* instead of erroring, let's check further */
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (!task->safeToPush)
ereport(ERROR,
(errmsg("cannot execute a distributed query from a query on a "
"shard"),
errdetail("Executing a distributed query in a function call that "
"may be pushed to a remote node can lead to incorrect "
"results."),
errhint("Avoid nesting of distributed queries or use alter user "
"current_user set citus.allow_nested_distributed_execution "
"to on to allow it with possible incorrectness.")));
}
}
}
/*

View File

@ -899,15 +899,17 @@ ExecutorBoundParams(void)
* a function in a query that gets pushed down to the worker, and the
* function performs a query on a distributed table.
*/
void
EnsureTaskExecutionAllowed(bool isRemote)
bool
EnsureTaskExecutionAllowed(bool isRemote, bool shouldError)
{
if (IsTaskExecutionAllowed(isRemote))
{
return;
return true;
}
ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a "
if (shouldError)
ereport(ERROR,
(errmsg("cannot execute a distributed query from a query on a "
"shard"),
errdetail("Executing a distributed query in a function call that "
"may be pushed to a remote node can lead to incorrect "
@ -915,6 +917,8 @@ EnsureTaskExecutionAllowed(bool isRemote)
errhint("Avoid nesting of distributed queries or use alter user "
"current_user set citus.allow_nested_distributed_execution "
"to on to allow it with possible incorrectness.")));
return false;
}

View File

@ -1792,6 +1792,8 @@ CreateTask(TaskType taskType)
task->partiallyLocalOrRemote = false;
task->relationShardList = NIL;
task->safeToPush = false;
return task;
}
@ -2176,6 +2178,56 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
task->replicationModel = replicationModel;
task->parametersInQueryStringResolved = parametersInQueryResolved;
StringInfo sqlQueryString = makeStringInfo();
pg_get_query_def(query, sqlQueryString);
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for task %d", task->taskId),
errdetail("query string: \"%s\"",
sqlQueryString->data)));
// if (query->hasTargetSRFs)
// if (query->rtable)
// if (query->jointree)
// if (query->targetList)
// if (query->returningList)
/* Check the target list */
// task->safeToPush = true;
// ListCell *lc;
// bool foundUDF = false;
// foreach (lc, query->targetList)
// {
// TargetEntry *tle = (TargetEntry *) lfirst(lc);
// elog(DEBUG2, "walking target list");
// if (ContainsUDFWalker((Node *) tle->expr, &foundUDF))
// {
// task->safeToPush = false;
// elog(DEBUG2, "UNSAFE");
// // break;
// }
// }
//
/* quick check first */
// if (colocationId) //FIXME include header for INVALID_COLOCATION_ID ?
// {
// goto exitnow;
// }
ListCell *lc;
bool foundNonReferenceTable = false;
foreach (lc, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(lc);
// elog(DEBUG2, "walking relation shard list");
if (!IsCitusTableType(relationShard->relationId, REFERENCE_TABLE))
{
foundNonReferenceTable = true;
// elog(DEBUG2, "found UNSAFE");
}
}
if (!foundNonReferenceTable)
task->safeToPush = true;
return list_make1(task);
}

View File

@ -327,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(isLocalTableModification);
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
COPY_SCALAR_FIELD(safeToPush);
}

View File

@ -536,6 +536,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(isLocalTableModification);
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
WRITE_BOOL_FIELD(safeToPush);
}

View File

@ -153,7 +153,7 @@ extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern ParamListInfo ExecutorBoundParams(void);
extern void EnsureTaskExecutionAllowed(bool isRemote);
extern bool EnsureTaskExecutionAllowed(bool isRemote, bool shouldError);
#endif /* MULTI_EXECUTOR_H */

View File

@ -334,6 +334,9 @@ typedef struct Task
Const *partitionKeyValue;
int colocationId;
/* if it's granted this task nested statements are safe to be executed */
bool safeToPush;
} Task;

View File

@ -0,0 +1,331 @@
create schema functions_pushdown;
set search_path to functions_pushdown;
set citus.shard_replication_factor TO 1;
create table reference_table( id bigint primary key, t text);
select create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
create table distributed_table( id bigint primary key
, fk_id bigint REFERENCES reference_table (id)
, t text
);
select create_distributed_table('distributed_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE FUNCTION reference_function_sql(i_id bigint)
RETURNS bigint LANGUAGE sql AS
$function$
SELECT count(*) FROM functions_pushdown.reference_table where id = i_id;
$function$;
CREATE FUNCTION reference_function_plpgsql(i_id bigint)
RETURNS bigint LANGUAGE plpgsql AS
$function$
DECLARE
result bigint; -- Variable to hold the count
BEGIN
-- Fetch the count into the variable
SELECT count(*) INTO result
FROM functions_pushdown.reference_table
WHERE id = i_id;
-- Return the result
RETURN result;
END;
$function$;
CREATE FUNCTION distributed_function_sql(i_id bigint)
RETURNS bigint LANGUAGE sql AS
$function$
SELECT count(*) FROM functions_pushdown.distributed_table where id = i_id;
$function$;
CREATE FUNCTION distributed_function_plpgsql(i_id bigint)
RETURNS bigint LANGUAGE plpgsql AS
$function$
DECLARE
result bigint; -- Variable to hold the count
BEGIN
-- Fetch the count into the variable
SELECT count(*) INTO result
FROM functions_pushdown.distributed_table
WHERE id = i_id;
-- Return the result
RETURN result;
END;
$function$;
select create_distributed_function('distributed_function_sql(bigint)', '$1'
, colocate_with := 'distributed_table');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('distributed_function_plpgsql(bigint)', '$1'
, colocate_with := 'distributed_table');
create_distributed_function
---------------------------------------------------------------------
(1 row)
insert into reference_table values (1, 'a');
insert into reference_table values (2, 'b');
insert into reference_table values (3, 'c');
insert into distributed_table values (1, 1, 'aa');
insert into distributed_table values (2, 2, 'bb');
insert into distributed_table values (3, 3, 'cc');
insert into distributed_table values (4, 2, 'BB');
-- REFERENCE
select *,reference_function_sql(id) from reference_table order by id;
id | t | reference_function_sql
---------------------------------------------------------------------
1 | a | 1
2 | b | 1
3 | c | 1
(3 rows)
select *,reference_function_sql(id) from reference_table where id = 1;
id | t | reference_function_sql
---------------------------------------------------------------------
1 | a | 1
(1 row)
select *,reference_function_plpgsql(id) from reference_table order by id;
id | t | reference_function_plpgsql
---------------------------------------------------------------------
1 | a | 1
2 | b | 1
3 | c | 1
(3 rows)
select *,reference_function_plpgsql(id) from reference_table where id = 1;
id | t | reference_function_plpgsql
---------------------------------------------------------------------
1 | a | 1
(1 row)
select *,reference_function_sql(id) from distributed_table order by id;
id | fk_id | t | reference_function_sql
---------------------------------------------------------------------
1 | 1 | aa | 1
2 | 2 | bb | 1
3 | 3 | cc | 1
4 | 2 | BB | 0
(4 rows)
select *,reference_function_sql(id) from distributed_table where id = 1;
id | fk_id | t | reference_function_sql
---------------------------------------------------------------------
1 | 1 | aa | 1
(1 row)
select *,reference_function_sql(id) from distributed_table where id = 4;
id | fk_id | t | reference_function_sql
---------------------------------------------------------------------
4 | 2 | BB | 0
(1 row)
select *,reference_function_plpgsql(id) from distributed_table order by id;
id | fk_id | t | reference_function_plpgsql
---------------------------------------------------------------------
1 | 1 | aa | 1
2 | 2 | bb | 1
3 | 3 | cc | 1
4 | 2 | BB | 0
(4 rows)
select *,reference_function_plpgsql(id) from distributed_table where id = 1;
id | fk_id | t | reference_function_plpgsql
---------------------------------------------------------------------
1 | 1 | aa | 1
(1 row)
select *,reference_function_plpgsql(id) from distributed_table where id = 4;
id | fk_id | t | reference_function_plpgsql
---------------------------------------------------------------------
4 | 2 | BB | 0
(1 row)
-- DISTRIBUTE (not supported yet)
select *,distributed_function_sql(id) from reference_table order by id;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_sql(id) from reference_table where id = 1;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_plpgsql(id) from reference_table order by id;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_plpgsql(id) from reference_table where id = 1;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_sql(id) from distributed_table order by id;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_sql(id) from distributed_table where id = 1;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "distributed_function_sql" statement 1
while executing command on localhost:xxxxx
select *,distributed_function_plpgsql(id) from distributed_table order by id;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL statement "SELECT count(*) FROM functions_pushdown.distributed_table
WHERE id = i_id"
PL/pgSQL function functions_pushdown.distributed_function_plpgsql(bigint) line XX at SQL statement
while executing command on localhost:xxxxx
select *,distributed_function_plpgsql(id) from distributed_table where id = 1;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL statement "SELECT count(*) FROM functions_pushdown.distributed_table
WHERE id = i_id"
PL/pgSQL function functions_pushdown.distributed_function_plpgsql(bigint) line XX at SQL statement
while executing command on localhost:xxxxx
-- some other checks (all should pass)
select reference_function_sql(1);
reference_function_sql
---------------------------------------------------------------------
1
(1 row)
select reference_function_sql(2);
reference_function_sql
---------------------------------------------------------------------
1
(1 row)
select reference_function_sql(3);
reference_function_sql
---------------------------------------------------------------------
1
(1 row)
select reference_function_sql(4);
reference_function_sql
---------------------------------------------------------------------
0
(1 row)
select reference_function_plpgsql(1);
reference_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select reference_function_plpgsql(2);
reference_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select reference_function_plpgsql(3);
reference_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select reference_function_plpgsql(4);
reference_function_plpgsql
---------------------------------------------------------------------
0
(1 row)
select distributed_function_sql(1);
distributed_function_sql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_sql(2);
distributed_function_sql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_sql(3);
distributed_function_sql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_sql(4);
distributed_function_sql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_plpgsql(1);
distributed_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_plpgsql(2);
distributed_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_plpgsql(3);
distributed_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
select distributed_function_plpgsql(4);
distributed_function_plpgsql
---------------------------------------------------------------------
1
(1 row)
-- https://github.com/citusdata/citus/issues/5887
CREATE TABLE functions_pushdown.test(a int);
SELECT create_distributed_table('functions_pushdown.test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO functions_pushdown.test SELECT i FROM generate_series(1,10)i;
CREATE OR REPLACE FUNCTION some_func() RETURNS integer
AS 'select count(*) FROM functions_pushdown.test;'
LANGUAGE SQL
VOLATILE
RETURNS NULL ON NULL INPUT;
SELECT some_func() FROM functions_pushdown.test;
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: SQL function "some_func" statement 1
while executing command on localhost:xxxxx
drop table distributed_table;
drop table reference_table;
drop schema functions_pushdown cascade;
NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to function reference_function_sql(bigint)
drop cascades to function reference_function_plpgsql(bigint)
drop cascades to function distributed_function_sql(bigint)
drop cascades to function distributed_function_plpgsql(bigint)
drop cascades to table test
drop cascades to function some_func()
set citus.shard_replication_factor TO 2;

View File

@ -375,6 +375,7 @@ test: distributed_collations_conflict
test: function_propagation
test: view_propagation
test: check_mx
test: functions_pushdown
# ---------
# deparsing logic tests

View File

@ -0,0 +1,135 @@
create schema functions_pushdown;
set search_path to functions_pushdown;
set citus.shard_replication_factor TO 1;
create table reference_table( id bigint primary key, t text);
select create_reference_table('reference_table');
create table distributed_table( id bigint primary key
, fk_id bigint REFERENCES reference_table (id)
, t text
);
select create_distributed_table('distributed_table', 'id');
CREATE FUNCTION reference_function_sql(i_id bigint)
RETURNS bigint LANGUAGE sql AS
$function$
SELECT count(*) FROM functions_pushdown.reference_table where id = i_id;
$function$;
CREATE FUNCTION reference_function_plpgsql(i_id bigint)
RETURNS bigint LANGUAGE plpgsql AS
$function$
DECLARE
result bigint; -- Variable to hold the count
BEGIN
-- Fetch the count into the variable
SELECT count(*) INTO result
FROM functions_pushdown.reference_table
WHERE id = i_id;
-- Return the result
RETURN result;
END;
$function$;
CREATE FUNCTION distributed_function_sql(i_id bigint)
RETURNS bigint LANGUAGE sql AS
$function$
SELECT count(*) FROM functions_pushdown.distributed_table where id = i_id;
$function$;
CREATE FUNCTION distributed_function_plpgsql(i_id bigint)
RETURNS bigint LANGUAGE plpgsql AS
$function$
DECLARE
result bigint; -- Variable to hold the count
BEGIN
-- Fetch the count into the variable
SELECT count(*) INTO result
FROM functions_pushdown.distributed_table
WHERE id = i_id;
-- Return the result
RETURN result;
END;
$function$;
select create_distributed_function('distributed_function_sql(bigint)', '$1'
, colocate_with := 'distributed_table');
select create_distributed_function('distributed_function_plpgsql(bigint)', '$1'
, colocate_with := 'distributed_table');
insert into reference_table values (1, 'a');
insert into reference_table values (2, 'b');
insert into reference_table values (3, 'c');
insert into distributed_table values (1, 1, 'aa');
insert into distributed_table values (2, 2, 'bb');
insert into distributed_table values (3, 3, 'cc');
insert into distributed_table values (4, 2, 'BB');
-- REFERENCE
select *,reference_function_sql(id) from reference_table order by id;
select *,reference_function_sql(id) from reference_table where id = 1;
select *,reference_function_plpgsql(id) from reference_table order by id;
select *,reference_function_plpgsql(id) from reference_table where id = 1;
select *,reference_function_sql(id) from distributed_table order by id;
select *,reference_function_sql(id) from distributed_table where id = 1;
select *,reference_function_sql(id) from distributed_table where id = 4;
select *,reference_function_plpgsql(id) from distributed_table order by id;
select *,reference_function_plpgsql(id) from distributed_table where id = 1;
select *,reference_function_plpgsql(id) from distributed_table where id = 4;
-- DISTRIBUTE (not supported yet)
select *,distributed_function_sql(id) from reference_table order by id;
select *,distributed_function_sql(id) from reference_table where id = 1;
select *,distributed_function_plpgsql(id) from reference_table order by id;
select *,distributed_function_plpgsql(id) from reference_table where id = 1;
select *,distributed_function_sql(id) from distributed_table order by id;
select *,distributed_function_sql(id) from distributed_table where id = 1;
select *,distributed_function_plpgsql(id) from distributed_table order by id;
select *,distributed_function_plpgsql(id) from distributed_table where id = 1;
-- some other checks (all should pass)
select reference_function_sql(1);
select reference_function_sql(2);
select reference_function_sql(3);
select reference_function_sql(4);
select reference_function_plpgsql(1);
select reference_function_plpgsql(2);
select reference_function_plpgsql(3);
select reference_function_plpgsql(4);
select distributed_function_sql(1);
select distributed_function_sql(2);
select distributed_function_sql(3);
select distributed_function_sql(4);
select distributed_function_plpgsql(1);
select distributed_function_plpgsql(2);
select distributed_function_plpgsql(3);
select distributed_function_plpgsql(4);
-- https://github.com/citusdata/citus/issues/5887
CREATE TABLE functions_pushdown.test(a int);
SELECT create_distributed_table('functions_pushdown.test', 'a');
INSERT INTO functions_pushdown.test SELECT i FROM generate_series(1,10)i;
CREATE OR REPLACE FUNCTION some_func() RETURNS integer
AS 'select count(*) FROM functions_pushdown.test;'
LANGUAGE SQL
VOLATILE
RETURNS NULL ON NULL INPUT;
SELECT some_func() FROM functions_pushdown.test;
drop table distributed_table;
drop table reference_table;
drop schema functions_pushdown cascade;
set citus.shard_replication_factor TO 2;