Merge branch 'master' into velioglu/function_propagation

velioglu/tmpfuncprop
Burak Velioglu 2022-02-18 01:51:10 +03:00
commit b8267b1fad
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
7 changed files with 761 additions and 22 deletions

View File

@ -237,16 +237,17 @@ CitusExecutorRun(QueryDesc *queryDesc,
* transactions. * transactions.
*/ */
CitusTableCacheFlushInvalidatedEntries(); CitusTableCacheFlushInvalidatedEntries();
InTopLevelDelegatedFunctionCall = false;
/*
* Within a 2PC, when a function is delegated to a remote node, we pin
* the distribution argument as the shard key for all the SQL in the
* function's block. The restriction is imposed to not to access other
* nodes from the current node and violate the transactional integrity
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
*/
ResetAllowedShardKeyValue();
} }
/*
* Within a 2PC, when a function is delegated to a remote node, we pin
* the distribution argument as the shard key for all the SQL in the
* function's block. The restriction is imposed to not to access other
* nodes from the current node, and violate the transactional integrity
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
*/
CheckAndResetAllowedShardKeyValueIfNeeded();
} }
PG_CATCH(); PG_CATCH();
{ {
@ -260,13 +261,15 @@ CitusExecutorRun(QueryDesc *queryDesc,
if (ExecutorLevel == 0 && PlannerLevel == 0) if (ExecutorLevel == 0 && PlannerLevel == 0)
{ {
/* InTopLevelDelegatedFunctionCall = false;
* In case of an exception, reset the pinned shard-key, for more
* details see the function header.
*/
ResetAllowedShardKeyValue();
} }
/*
* In case of an exception, reset the pinned shard-key, for more
* details see the function header.
*/
CheckAndResetAllowedShardKeyValueIfNeeded();
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();

View File

@ -723,6 +723,16 @@ FunctionInFromClause(List *fromlist, Query *query)
static void static void
EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId) EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
{ {
/*
* If the distribution key is already set, the key is fixed until
* the force-delegation function returns. All nested force-delegation
* functions must use the same key.
*/
if (AllowedDistributionColumnValue.isActive)
{
return;
}
/* /*
* The saved distribution argument need to persist through the life * The saved distribution argument need to persist through the life
* of the query, both during the planning (where we save) and execution * of the query, both during the planning (where we save) and execution
@ -734,6 +744,7 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
colocationId)); colocationId));
AllowedDistributionColumnValue.distributionColumnValue = copyObject(distArgument); AllowedDistributionColumnValue.distributionColumnValue = copyObject(distArgument);
AllowedDistributionColumnValue.colocationId = colocationId; AllowedDistributionColumnValue.colocationId = colocationId;
AllowedDistributionColumnValue.executorLevel = ExecutorLevel;
AllowedDistributionColumnValue.isActive = true; AllowedDistributionColumnValue.isActive = true;
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
@ -747,15 +758,22 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
* the 2PC. Reset the distribution argument value once the function ends. * the 2PC. Reset the distribution argument value once the function ends.
*/ */
void void
ResetAllowedShardKeyValue(void) CheckAndResetAllowedShardKeyValueIfNeeded(void)
{ {
if (AllowedDistributionColumnValue.isActive) /*
* If no distribution argument is pinned or the pinned argument was
* set by a nested-executor from upper level, nothing to reset.
*/
if (!AllowedDistributionColumnValue.isActive ||
ExecutorLevel > AllowedDistributionColumnValue.executorLevel)
{ {
pfree(AllowedDistributionColumnValue.distributionColumnValue); return;
AllowedDistributionColumnValue.isActive = false;
} }
InTopLevelDelegatedFunctionCall = false; Assert(ExecutorLevel == AllowedDistributionColumnValue.executorLevel);
pfree(AllowedDistributionColumnValue.distributionColumnValue);
AllowedDistributionColumnValue.isActive = false;
AllowedDistributionColumnValue.executorLevel = 0;
} }
@ -767,6 +785,7 @@ bool
IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId) IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId)
{ {
Assert(AllowedDistributionColumnValue.isActive); Assert(AllowedDistributionColumnValue.isActive);
Assert(ExecutorLevel > AllowedDistributionColumnValue.executorLevel);
ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d", ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d",
pretty_format_node_dump( pretty_format_node_dump(

View File

@ -557,7 +557,8 @@ ResetGlobalVariables()
MetadataSyncOnCommit = false; MetadataSyncOnCommit = false;
InTopLevelDelegatedFunctionCall = false; InTopLevelDelegatedFunctionCall = false;
ResetWorkerErrorIndication(); ResetWorkerErrorIndication();
AllowedDistributionColumnValue.isActive = false; memset(&AllowedDistributionColumnValue, 0,
sizeof(AllowedDistributionColumn));
} }

View File

@ -23,7 +23,7 @@ extern bool InTopLevelDelegatedFunctionCall;
extern bool InDelegatedProcedureCall; extern bool InDelegatedProcedureCall;
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);
extern void ResetAllowedShardKeyValue(void); extern void CheckAndResetAllowedShardKeyValueIfNeeded(void);
extern bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId); extern bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId);
#endif /* FUNCTION_CALL_DELEGATION_H */ #endif /* FUNCTION_CALL_DELEGATION_H */

View File

@ -70,6 +70,9 @@ typedef struct AllowedDistributionColumn
Const *distributionColumnValue; Const *distributionColumnValue;
uint32 colocationId; uint32 colocationId;
bool isActive; bool isActive;
/* In nested executor, track the level at which value is set */
int executorLevel;
} AllowedDistributionColumn; } AllowedDistributionColumn;
/* /*

View File

@ -1477,10 +1477,471 @@ NOTICE: INPUT 3
CONTEXT: PL/pgSQL function test(integer) line XX at RAISE CONTEXT: PL/pgSQL function test(integer) line XX at RAISE
SQL statement "SELECT test(3)" SQL statement "SELECT test(3)"
PL/pgSQL function inline_code_block line XX at PERFORM PL/pgSQL function inline_code_block line XX at PERFORM
CREATE TABLE testnested_table (x int, y int);
SELECT create_distributed_table('testnested_table','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION inner_fn(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
-- Non-force function calling force-delegation function
CREATE OR REPLACE FUNCTION outer_local_fn()
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM 1 FROM inner_fn(1);
INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3);
PERFORM 1 FROM inner_fn(4);
INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT create_distributed_function('inner_fn(int)','x',
colocate_with:='testnested_table', force_delegation := true);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT outer_local_fn();
DEBUG: function does not have co-located tables
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)"
PL/pgSQL function outer_local_fn() line XX at PERFORM
DEBUG: pushing down the function call
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)"
PL/pgSQL function outer_local_fn() line XX at PERFORM
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)"
PL/pgSQL function outer_local_fn() line XX at PERFORM
DEBUG: pushing down the function call
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)"
PL/pgSQL function outer_local_fn() line XX at PERFORM
outer_local_fn
---------------------------------------------------------------------
(1 row)
-- Rows from 1-6 should appear
SELECT * FROM testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
1 | 1
2 | 3
4 | 4
5 | 6
(4 rows)
BEGIN;
SELECT outer_local_fn();
DEBUG: not pushing down function calls in a multi-statement transaction
outer_local_fn
---------------------------------------------------------------------
(1 row)
END;
SELECT * FROM testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
1 | 1
1 | 1
2 | 3
2 | 3
4 | 4
4 | 4
5 | 6
5 | 6
(8 rows)
DROP FUNCTION inner_fn(int);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
DROP FUNCTION outer_local_fn();
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
TRUNCATE TABLE testnested_table;
CREATE OR REPLACE FUNCTION inner_fn(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
-- Force-delegation function calling non-force function
CREATE OR REPLACE FUNCTION outer_fn(y int, z int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM 1 FROM forcepushdown_schema.inner_fn(y);
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
PERFORM 1 FROM forcepushdown_schema.inner_fn(z);
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT create_distributed_function('inner_fn(int)','x',
colocate_with:='testnested_table', force_delegation := false);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('outer_fn(int, int)','y',
colocate_with:='testnested_table', force_delegation := true);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT outer_fn(1, 2);
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)"
PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement
SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)"
PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM
while executing command on localhost:xxxxx
BEGIN;
SELECT outer_fn(1, 2);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)"
PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement
SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)"
PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM
while executing command on localhost:xxxxx
END;
-- No rows
SELECT * FROM testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
(0 rows)
-- Force-delegation function calling force-delegation function
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT create_distributed_function(
'force_push_outer(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'force_push_inner(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- Keys 7,8,9,14 fall on one node and 15 on a different node
-- Function gets delegated to node with shard-key = 7 and inner function
-- will not be delegated but inserts shard-key = 8 locally
SELECT force_push_outer(7);
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
while executing command on localhost:xxxxx
BEGIN;
-- Function gets delegated to node with shard-key = 8 and inner function
-- will not be delegated but inserts shard-key = 9 locally
SELECT force_push_outer(8);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
while executing command on localhost:xxxxx
END;
BEGIN;
-- Function gets delegated to node with shard-key = 14 and inner function
-- will not be delegated but fails to insert shard-key = 15 remotely
SELECT force_push_outer(14);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
while executing command on localhost:xxxxx
END;
SELECT * FROM testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
(0 rows)
--
-- Function-1() --> function-2() --> function-3()
--
CREATE OR REPLACE FUNCTION force_push_1(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
CREATE OR REPLACE FUNCTION force_push_2(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
CREATE OR REPLACE FUNCTION force_push_3(z int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT create_distributed_function(
'force_push_1(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'force_push_2(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'force_push_3(int)', 'z',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
TRUNCATE TABLE testnested_table;
BEGIN;
-- All local inserts
SELECT force_push_1(7);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement
SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1"
PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM
while executing command on localhost:xxxxx
END;
BEGIN;
-- Local(shard-keys 13, 15) + remote insert (shard-key 14)
SELECT force_push_1(13);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement
SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1"
PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM
while executing command on localhost:xxxxx
END;
SELECT * FROM testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
(0 rows)
TRUNCATE TABLE testnested_table;
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM FROM forcepushdown_schema.force_push_inner(x);
INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT create_distributed_function(
'force_push_inner(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function(
'force_push_outer(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
create_distributed_function
---------------------------------------------------------------------
(1 row)
BEGIN;
SELECT force_push_outer(7);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1)"
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at SQL statement
while executing command on localhost:xxxxx
END;
TABLE testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
(0 rows)
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
RAISE NOTICE '%', y;
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM FROM forcepushdown_schema.force_push_inner(x+1);
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
BEGIN;
SELECT force_push_outer(9);
DEBUG: pushing down function call in a multi-statement transaction
DEBUG: pushing down the function call
NOTICE: 10
DETAIL: from localhost:xxxxx
force_push_outer
---------------------------------------------------------------------
(1 row)
END;
TABLE testnested_table ORDER BY 1;
x | y
---------------------------------------------------------------------
9 | 9
(1 row)
RESET client_min_messages; RESET client_min_messages;
SET citus.log_remote_commands TO off; SET citus.log_remote_commands TO off;
DROP SCHEMA forcepushdown_schema CASCADE; DROP SCHEMA forcepushdown_schema CASCADE;
NOTICE: drop cascades to 38 other objects NOTICE: drop cascades to 46 other objects
DETAIL: drop cascades to table test_forcepushdown DETAIL: drop cascades to table test_forcepushdown
drop cascades to table test_forcepushdown_noncolocate drop cascades to table test_forcepushdown_noncolocate
drop cascades to function insert_data(integer) drop cascades to function insert_data(integer)
@ -1519,3 +1980,11 @@ drop cascades to function test_prepare(integer,integer)
drop cascades to function outer_test_prepare(integer,integer) drop cascades to function outer_test_prepare(integer,integer)
drop cascades to table test_perform drop cascades to table test_perform
drop cascades to function test(integer) drop cascades to function test(integer)
drop cascades to table testnested_table
drop cascades to function inner_fn(integer)
drop cascades to function outer_fn(integer,integer)
drop cascades to function force_push_inner(integer)
drop cascades to function force_push_outer(integer)
drop cascades to function force_push_1(integer)
drop cascades to function force_push_2(integer)
drop cascades to function force_push_3(integer)

View File

@ -687,6 +687,250 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
CREATE TABLE testnested_table (x int, y int);
SELECT create_distributed_table('testnested_table','x');
CREATE OR REPLACE FUNCTION inner_fn(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
-- Non-force function calling force-delegation function
CREATE OR REPLACE FUNCTION outer_local_fn()
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM 1 FROM inner_fn(1);
INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3);
PERFORM 1 FROM inner_fn(4);
INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6);
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('inner_fn(int)','x',
colocate_with:='testnested_table', force_delegation := true);
SELECT outer_local_fn();
-- Rows from 1-6 should appear
SELECT * FROM testnested_table ORDER BY 1;
BEGIN;
SELECT outer_local_fn();
END;
SELECT * FROM testnested_table ORDER BY 1;
DROP FUNCTION inner_fn(int);
DROP FUNCTION outer_local_fn();
TRUNCATE TABLE testnested_table;
CREATE OR REPLACE FUNCTION inner_fn(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
-- Force-delegation function calling non-force function
CREATE OR REPLACE FUNCTION outer_fn(y int, z int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM 1 FROM forcepushdown_schema.inner_fn(y);
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
PERFORM 1 FROM forcepushdown_schema.inner_fn(z);
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('inner_fn(int)','x',
colocate_with:='testnested_table', force_delegation := false);
SELECT create_distributed_function('outer_fn(int, int)','y',
colocate_with:='testnested_table', force_delegation := true);
SELECT outer_fn(1, 2);
BEGIN;
SELECT outer_fn(1, 2);
END;
-- No rows
SELECT * FROM testnested_table ORDER BY 1;
-- Force-delegation function calling force-delegation function
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function(
'force_push_outer(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
SELECT create_distributed_function(
'force_push_inner(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
-- Keys 7,8,9,14 fall on one node and 15 on a different node
-- Function gets delegated to node with shard-key = 7 and inner function
-- will not be delegated but inserts shard-key = 8 locally
SELECT force_push_outer(7);
BEGIN;
-- Function gets delegated to node with shard-key = 8 and inner function
-- will not be delegated but inserts shard-key = 9 locally
SELECT force_push_outer(8);
END;
BEGIN;
-- Function gets delegated to node with shard-key = 14 and inner function
-- will not be delegated but fails to insert shard-key = 15 remotely
SELECT force_push_outer(14);
END;
SELECT * FROM testnested_table ORDER BY 1;
--
-- Function-1() --> function-2() --> function-3()
--
CREATE OR REPLACE FUNCTION force_push_1(x int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION force_push_2(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION force_push_3(z int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function(
'force_push_1(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
SELECT create_distributed_function(
'force_push_2(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
SELECT create_distributed_function(
'force_push_3(int)', 'z',
colocate_with := 'testnested_table',
force_delegation := true
);
TRUNCATE TABLE testnested_table;
BEGIN;
-- All local inserts
SELECT force_push_1(7);
END;
BEGIN;
-- Local(shard-keys 13, 15) + remote insert (shard-key 14)
SELECT force_push_1(13);
END;
SELECT * FROM testnested_table ORDER BY 1;
TRUNCATE TABLE testnested_table;
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM FROM forcepushdown_schema.force_push_inner(x);
INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1);
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function(
'force_push_inner(int)', 'y',
colocate_with := 'testnested_table',
force_delegation := true
);
SELECT create_distributed_function(
'force_push_outer(int)', 'x',
colocate_with := 'testnested_table',
force_delegation := true
);
BEGIN;
SELECT force_push_outer(7);
END;
TABLE testnested_table ORDER BY 1;
CREATE OR REPLACE FUNCTION force_push_inner(y int)
RETURNS void
AS $$
DECLARE
BEGIN
RAISE NOTICE '%', y;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION force_push_outer(x int)
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM FROM forcepushdown_schema.force_push_inner(x+1);
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
END;
$$ LANGUAGE plpgsql;
BEGIN;
SELECT force_push_outer(9);
END;
TABLE testnested_table ORDER BY 1;
RESET client_min_messages; RESET client_min_messages;
SET citus.log_remote_commands TO off; SET citus.log_remote_commands TO off;
DROP SCHEMA forcepushdown_schema CASCADE; DROP SCHEMA forcepushdown_schema CASCADE;