diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 93f7baf7a..28498e0f2 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -237,16 +237,17 @@ CitusExecutorRun(QueryDesc *queryDesc, * transactions. */ CitusTableCacheFlushInvalidatedEntries(); - - /* - * Within a 2PC, when a function is delegated to a remote node, we pin - * the distribution argument as the shard key for all the SQL in the - * function's block. The restriction is imposed to not to access other - * nodes from the current node and violate the transactional integrity - * of the 2PC. Now that the query is ending, reset the shard key to NULL. - */ - ResetAllowedShardKeyValue(); + InTopLevelDelegatedFunctionCall = false; } + + /* + * Within a 2PC, when a function is delegated to a remote node, we pin + * the distribution argument as the shard key for all the SQL in the + * function's block. The restriction is imposed to not to access other + * nodes from the current node, and violate the transactional integrity + * of the 2PC. Now that the query is ending, reset the shard key to NULL. + */ + CheckAndResetAllowedShardKeyValueIfNeeded(); } PG_CATCH(); { @@ -260,13 +261,15 @@ CitusExecutorRun(QueryDesc *queryDesc, if (ExecutorLevel == 0 && PlannerLevel == 0) { - /* - * In case of an exception, reset the pinned shard-key, for more - * details see the function header. - */ - ResetAllowedShardKeyValue(); + InTopLevelDelegatedFunctionCall = false; } + /* + * In case of an exception, reset the pinned shard-key, for more + * details see the function header. + */ + CheckAndResetAllowedShardKeyValueIfNeeded(); + PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 9ea275035..3ca22f3b1 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -723,6 +723,16 @@ FunctionInFromClause(List *fromlist, Query *query) static void EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId) { + /* + * If the distribution key is already set, the key is fixed until + * the force-delegation function returns. All nested force-delegation + * functions must use the same key. + */ + if (AllowedDistributionColumnValue.isActive) + { + return; + } + /* * The saved distribution argument need to persist through the life * of the query, both during the planning (where we save) and execution @@ -734,6 +744,7 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId) colocationId)); AllowedDistributionColumnValue.distributionColumnValue = copyObject(distArgument); AllowedDistributionColumnValue.colocationId = colocationId; + AllowedDistributionColumnValue.executorLevel = ExecutorLevel; AllowedDistributionColumnValue.isActive = true; MemoryContextSwitchTo(oldcontext); } @@ -747,15 +758,22 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId) * the 2PC. Reset the distribution argument value once the function ends. */ void -ResetAllowedShardKeyValue(void) +CheckAndResetAllowedShardKeyValueIfNeeded(void) { - if (AllowedDistributionColumnValue.isActive) + /* + * If no distribution argument is pinned or the pinned argument was + * set by a nested-executor from upper level, nothing to reset. + */ + if (!AllowedDistributionColumnValue.isActive || + ExecutorLevel > AllowedDistributionColumnValue.executorLevel) { - pfree(AllowedDistributionColumnValue.distributionColumnValue); - AllowedDistributionColumnValue.isActive = false; + return; } - InTopLevelDelegatedFunctionCall = false; + Assert(ExecutorLevel == AllowedDistributionColumnValue.executorLevel); + pfree(AllowedDistributionColumnValue.distributionColumnValue); + AllowedDistributionColumnValue.isActive = false; + AllowedDistributionColumnValue.executorLevel = 0; } @@ -767,6 +785,7 @@ bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId) { Assert(AllowedDistributionColumnValue.isActive); + Assert(ExecutorLevel > AllowedDistributionColumnValue.executorLevel); ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d", pretty_format_node_dump( diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 4fe97e421..78e14367a 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -557,7 +557,8 @@ ResetGlobalVariables() MetadataSyncOnCommit = false; InTopLevelDelegatedFunctionCall = false; ResetWorkerErrorIndication(); - AllowedDistributionColumnValue.isActive = false; + memset(&AllowedDistributionColumnValue, 0, + sizeof(AllowedDistributionColumn)); } diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 5471a3f1d..bf3dbe170 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -23,7 +23,7 @@ extern bool InTopLevelDelegatedFunctionCall; extern bool InDelegatedProcedureCall; PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); -extern void ResetAllowedShardKeyValue(void); +extern void CheckAndResetAllowedShardKeyValueIfNeeded(void); extern bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId); #endif /* FUNCTION_CALL_DELEGATION_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 48bce27a5..5ff34e21a 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -70,6 +70,9 @@ typedef struct AllowedDistributionColumn Const *distributionColumnValue; uint32 colocationId; bool isActive; + + /* In nested executor, track the level at which value is set */ + int executorLevel; } AllowedDistributionColumn; /* diff --git a/src/test/regress/expected/forcedelegation_functions.out b/src/test/regress/expected/forcedelegation_functions.out index 0b266234e..c26f7b75b 100644 --- a/src/test/regress/expected/forcedelegation_functions.out +++ b/src/test/regress/expected/forcedelegation_functions.out @@ -1477,10 +1477,471 @@ NOTICE: INPUT 3 CONTEXT: PL/pgSQL function test(integer) line XX at RAISE SQL statement "SELECT test(3)" PL/pgSQL function inline_code_block line XX at PERFORM +CREATE TABLE testnested_table (x int, y int); +SELECT create_distributed_table('testnested_table','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION inner_fn(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +-- Non-force function calling force-delegation function +CREATE OR REPLACE FUNCTION outer_local_fn() +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM 1 FROM inner_fn(1); + INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3); + PERFORM 1 FROM inner_fn(4); + INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +SELECT create_distributed_function('inner_fn(int)','x', + colocate_with:='testnested_table', force_delegation := true); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT outer_local_fn(); +DEBUG: function does not have co-located tables +DEBUG: pushing down function call in a multi-statement transaction +CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)" +PL/pgSQL function outer_local_fn() line XX at PERFORM +DEBUG: pushing down the function call +CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)" +PL/pgSQL function outer_local_fn() line XX at PERFORM +DEBUG: pushing down function call in a multi-statement transaction +CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)" +PL/pgSQL function outer_local_fn() line XX at PERFORM +DEBUG: pushing down the function call +CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)" +PL/pgSQL function outer_local_fn() line XX at PERFORM + outer_local_fn +--------------------------------------------------------------------- + +(1 row) + +-- Rows from 1-6 should appear +SELECT * FROM testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 3 + 4 | 4 + 5 | 6 +(4 rows) + +BEGIN; +SELECT outer_local_fn(); +DEBUG: not pushing down function calls in a multi-statement transaction + outer_local_fn +--------------------------------------------------------------------- + +(1 row) + +END; +SELECT * FROM testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- + 1 | 1 + 1 | 1 + 2 | 3 + 2 | 3 + 4 | 4 + 4 | 4 + 5 | 6 + 5 | 6 +(8 rows) + +DROP FUNCTION inner_fn(int); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +DROP FUNCTION outer_local_fn(); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +TRUNCATE TABLE testnested_table; +CREATE OR REPLACE FUNCTION inner_fn(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +-- Force-delegation function calling non-force function +CREATE OR REPLACE FUNCTION outer_fn(y int, z int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM 1 FROM forcepushdown_schema.inner_fn(y); + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); + PERFORM 1 FROM forcepushdown_schema.inner_fn(z); + INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +SELECT create_distributed_function('inner_fn(int)','x', + colocate_with:='testnested_table', force_delegation := false); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function('outer_fn(int, int)','y', + colocate_with:='testnested_table', force_delegation := true); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT outer_fn(1, 2); +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)" +PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement +SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)" +PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM +while executing command on localhost:xxxxx +BEGIN; +SELECT outer_fn(1, 2); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)" +PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement +SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)" +PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM +while executing command on localhost:xxxxx +END; +-- No rows +SELECT * FROM testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- +(0 rows) + +-- Force-delegation function calling force-delegation function +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); + PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +SELECT create_distributed_function( + 'force_push_outer(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'force_push_inner(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +-- Keys 7,8,9,14 fall on one node and 15 on a different node +-- Function gets delegated to node with shard-key = 7 and inner function +-- will not be delegated but inserts shard-key = 8 locally +SELECT force_push_outer(7); +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)" +PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement +SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1" +PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM +while executing command on localhost:xxxxx +BEGIN; +-- Function gets delegated to node with shard-key = 8 and inner function +-- will not be delegated but inserts shard-key = 9 locally +SELECT force_push_outer(8); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)" +PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement +SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1" +PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM +while executing command on localhost:xxxxx +END; +BEGIN; +-- Function gets delegated to node with shard-key = 14 and inner function +-- will not be delegated but fails to insert shard-key = 15 remotely +SELECT force_push_outer(14); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)" +PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement +SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1" +PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM +while executing command on localhost:xxxxx +END; +SELECT * FROM testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- +(0 rows) + +-- +-- Function-1() --> function-2() --> function-3() +-- +CREATE OR REPLACE FUNCTION force_push_1(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); + PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +CREATE OR REPLACE FUNCTION force_push_2(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); + PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +CREATE OR REPLACE FUNCTION force_push_3(z int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +SELECT create_distributed_function( + 'force_push_1(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'force_push_2(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'force_push_3(int)', 'z', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +TRUNCATE TABLE testnested_table; +BEGIN; +-- All local inserts +SELECT force_push_1(7); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)" +PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement +SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1" +PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM +while executing command on localhost:xxxxx +END; +BEGIN; +-- Local(shard-keys 13, 15) + remote insert (shard-key 14) +SELECT force_push_1(13); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)" +PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement +SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1" +PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM +while executing command on localhost:xxxxx +END; +SELECT * FROM testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- +(0 rows) + +TRUNCATE TABLE testnested_table; +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM FROM forcepushdown_schema.force_push_inner(x); + INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +SELECT create_distributed_function( + 'force_push_inner(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function( + 'force_push_outer(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +SELECT force_push_outer(7); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown +HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false) +CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1)" +PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at SQL statement +while executing command on localhost:xxxxx +END; +TABLE testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- +(0 rows) + +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + RAISE NOTICE '%', y; +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM FROM forcepushdown_schema.force_push_inner(x+1); + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands +BEGIN; +SELECT force_push_outer(9); +DEBUG: pushing down function call in a multi-statement transaction +DEBUG: pushing down the function call +NOTICE: 10 +DETAIL: from localhost:xxxxx + force_push_outer +--------------------------------------------------------------------- + +(1 row) + +END; +TABLE testnested_table ORDER BY 1; + x | y +--------------------------------------------------------------------- + 9 | 9 +(1 row) + RESET client_min_messages; SET citus.log_remote_commands TO off; DROP SCHEMA forcepushdown_schema CASCADE; -NOTICE: drop cascades to 38 other objects +NOTICE: drop cascades to 46 other objects DETAIL: drop cascades to table test_forcepushdown drop cascades to table test_forcepushdown_noncolocate drop cascades to function insert_data(integer) @@ -1519,3 +1980,11 @@ drop cascades to function test_prepare(integer,integer) drop cascades to function outer_test_prepare(integer,integer) drop cascades to table test_perform drop cascades to function test(integer) +drop cascades to table testnested_table +drop cascades to function inner_fn(integer) +drop cascades to function outer_fn(integer,integer) +drop cascades to function force_push_inner(integer) +drop cascades to function force_push_outer(integer) +drop cascades to function force_push_1(integer) +drop cascades to function force_push_2(integer) +drop cascades to function force_push_3(integer) diff --git a/src/test/regress/sql/forcedelegation_functions.sql b/src/test/regress/sql/forcedelegation_functions.sql index 19390c4d8..d95cfc75a 100644 --- a/src/test/regress/sql/forcedelegation_functions.sql +++ b/src/test/regress/sql/forcedelegation_functions.sql @@ -687,6 +687,250 @@ BEGIN END; $$ LANGUAGE plpgsql; +CREATE TABLE testnested_table (x int, y int); +SELECT create_distributed_table('testnested_table','x'); + +CREATE OR REPLACE FUNCTION inner_fn(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; + +-- Non-force function calling force-delegation function +CREATE OR REPLACE FUNCTION outer_local_fn() +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM 1 FROM inner_fn(1); + INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3); + PERFORM 1 FROM inner_fn(4); + INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6); +END; +$$ LANGUAGE plpgsql; + +SELECT create_distributed_function('inner_fn(int)','x', + colocate_with:='testnested_table', force_delegation := true); + +SELECT outer_local_fn(); +-- Rows from 1-6 should appear +SELECT * FROM testnested_table ORDER BY 1; + +BEGIN; +SELECT outer_local_fn(); +END; +SELECT * FROM testnested_table ORDER BY 1; + +DROP FUNCTION inner_fn(int); +DROP FUNCTION outer_local_fn(); +TRUNCATE TABLE testnested_table; + +CREATE OR REPLACE FUNCTION inner_fn(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; + +-- Force-delegation function calling non-force function +CREATE OR REPLACE FUNCTION outer_fn(y int, z int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM 1 FROM forcepushdown_schema.inner_fn(y); + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); + PERFORM 1 FROM forcepushdown_schema.inner_fn(z); + INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z); +END; +$$ LANGUAGE plpgsql; + +SELECT create_distributed_function('inner_fn(int)','x', + colocate_with:='testnested_table', force_delegation := false); +SELECT create_distributed_function('outer_fn(int, int)','y', + colocate_with:='testnested_table', force_delegation := true); + +SELECT outer_fn(1, 2); +BEGIN; +SELECT outer_fn(1, 2); +END; + +-- No rows +SELECT * FROM testnested_table ORDER BY 1; + +-- Force-delegation function calling force-delegation function +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); + PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; + +SELECT create_distributed_function( + 'force_push_outer(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); +SELECT create_distributed_function( + 'force_push_inner(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); + +-- Keys 7,8,9,14 fall on one node and 15 on a different node + +-- Function gets delegated to node with shard-key = 7 and inner function +-- will not be delegated but inserts shard-key = 8 locally +SELECT force_push_outer(7); + +BEGIN; +-- Function gets delegated to node with shard-key = 8 and inner function +-- will not be delegated but inserts shard-key = 9 locally +SELECT force_push_outer(8); +END; + +BEGIN; +-- Function gets delegated to node with shard-key = 14 and inner function +-- will not be delegated but fails to insert shard-key = 15 remotely +SELECT force_push_outer(14); +END; +SELECT * FROM testnested_table ORDER BY 1; + +-- +-- Function-1() --> function-2() --> function-3() +-- +CREATE OR REPLACE FUNCTION force_push_1(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); + PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION force_push_2(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); + PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION force_push_3(z int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z); +END; +$$ LANGUAGE plpgsql; + +SELECT create_distributed_function( + 'force_push_1(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); +SELECT create_distributed_function( + 'force_push_2(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); +SELECT create_distributed_function( + 'force_push_3(int)', 'z', + colocate_with := 'testnested_table', + force_delegation := true +); + +TRUNCATE TABLE testnested_table; +BEGIN; +-- All local inserts +SELECT force_push_1(7); +END; + +BEGIN; +-- Local(shard-keys 13, 15) + remote insert (shard-key 14) +SELECT force_push_1(13); +END; + +SELECT * FROM testnested_table ORDER BY 1; + +TRUNCATE TABLE testnested_table; +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y); +END; +$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM FROM forcepushdown_schema.force_push_inner(x); + INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1); +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function( + 'force_push_inner(int)', 'y', + colocate_with := 'testnested_table', + force_delegation := true +); +SELECT create_distributed_function( + 'force_push_outer(int)', 'x', + colocate_with := 'testnested_table', + force_delegation := true +); + +BEGIN; +SELECT force_push_outer(7); +END; +TABLE testnested_table ORDER BY 1; + +CREATE OR REPLACE FUNCTION force_push_inner(y int) +RETURNS void +AS $$ +DECLARE +BEGIN + RAISE NOTICE '%', y; +END; +$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION force_push_outer(x int) +RETURNS void +AS $$ +DECLARE +BEGIN + PERFORM FROM forcepushdown_schema.force_push_inner(x+1); + INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x); +END; +$$ LANGUAGE plpgsql; + +BEGIN; +SELECT force_push_outer(9); +END; +TABLE testnested_table ORDER BY 1; + RESET client_min_messages; SET citus.log_remote_commands TO off; DROP SCHEMA forcepushdown_schema CASCADE;