mirror of https://github.com/citusdata/citus.git
Force-delegated functions' distribution argument must be reset as soon as the routine completes execution,
and not wait until the top level Executor ends. This fixes issue #5687velioglu/tmpfuncprop^2
parent
754d894375
commit
46fa47beea
|
@ -237,16 +237,17 @@ CitusExecutorRun(QueryDesc *queryDesc,
|
|||
* transactions.
|
||||
*/
|
||||
CitusTableCacheFlushInvalidatedEntries();
|
||||
|
||||
/*
|
||||
* Within a 2PC, when a function is delegated to a remote node, we pin
|
||||
* the distribution argument as the shard key for all the SQL in the
|
||||
* function's block. The restriction is imposed to not to access other
|
||||
* nodes from the current node and violate the transactional integrity
|
||||
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
|
||||
*/
|
||||
ResetAllowedShardKeyValue();
|
||||
InTopLevelDelegatedFunctionCall = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Within a 2PC, when a function is delegated to a remote node, we pin
|
||||
* the distribution argument as the shard key for all the SQL in the
|
||||
* function's block. The restriction is imposed to not to access other
|
||||
* nodes from the current node, and violate the transactional integrity
|
||||
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
|
||||
*/
|
||||
CheckAndResetAllowedShardKeyValueIfNeeded();
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
|
@ -260,13 +261,15 @@ CitusExecutorRun(QueryDesc *queryDesc,
|
|||
|
||||
if (ExecutorLevel == 0 && PlannerLevel == 0)
|
||||
{
|
||||
/*
|
||||
* In case of an exception, reset the pinned shard-key, for more
|
||||
* details see the function header.
|
||||
*/
|
||||
ResetAllowedShardKeyValue();
|
||||
InTopLevelDelegatedFunctionCall = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* In case of an exception, reset the pinned shard-key, for more
|
||||
* details see the function header.
|
||||
*/
|
||||
CheckAndResetAllowedShardKeyValueIfNeeded();
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
|
|
@ -723,6 +723,16 @@ FunctionInFromClause(List *fromlist, Query *query)
|
|||
static void
|
||||
EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
|
||||
{
|
||||
/*
|
||||
* If the distribution key is already set, the key is fixed until
|
||||
* the force-delegation function returns. All nested force-delegation
|
||||
* functions must use the same key.
|
||||
*/
|
||||
if (AllowedDistributionColumnValue.isActive)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* The saved distribution argument need to persist through the life
|
||||
* of the query, both during the planning (where we save) and execution
|
||||
|
@ -734,6 +744,7 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
|
|||
colocationId));
|
||||
AllowedDistributionColumnValue.distributionColumnValue = copyObject(distArgument);
|
||||
AllowedDistributionColumnValue.colocationId = colocationId;
|
||||
AllowedDistributionColumnValue.executorLevel = ExecutorLevel;
|
||||
AllowedDistributionColumnValue.isActive = true;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
@ -747,15 +758,22 @@ EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
|
|||
* the 2PC. Reset the distribution argument value once the function ends.
|
||||
*/
|
||||
void
|
||||
ResetAllowedShardKeyValue(void)
|
||||
CheckAndResetAllowedShardKeyValueIfNeeded(void)
|
||||
{
|
||||
if (AllowedDistributionColumnValue.isActive)
|
||||
/*
|
||||
* If no distribution argument is pinned or the pinned argument was
|
||||
* set by a nested-executor from upper level, nothing to reset.
|
||||
*/
|
||||
if (!AllowedDistributionColumnValue.isActive ||
|
||||
ExecutorLevel > AllowedDistributionColumnValue.executorLevel)
|
||||
{
|
||||
pfree(AllowedDistributionColumnValue.distributionColumnValue);
|
||||
AllowedDistributionColumnValue.isActive = false;
|
||||
return;
|
||||
}
|
||||
|
||||
InTopLevelDelegatedFunctionCall = false;
|
||||
Assert(ExecutorLevel == AllowedDistributionColumnValue.executorLevel);
|
||||
pfree(AllowedDistributionColumnValue.distributionColumnValue);
|
||||
AllowedDistributionColumnValue.isActive = false;
|
||||
AllowedDistributionColumnValue.executorLevel = 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -767,6 +785,7 @@ bool
|
|||
IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId)
|
||||
{
|
||||
Assert(AllowedDistributionColumnValue.isActive);
|
||||
Assert(ExecutorLevel > AllowedDistributionColumnValue.executorLevel);
|
||||
|
||||
ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d",
|
||||
pretty_format_node_dump(
|
||||
|
|
|
@ -557,7 +557,8 @@ ResetGlobalVariables()
|
|||
MetadataSyncOnCommit = false;
|
||||
InTopLevelDelegatedFunctionCall = false;
|
||||
ResetWorkerErrorIndication();
|
||||
AllowedDistributionColumnValue.isActive = false;
|
||||
memset(&AllowedDistributionColumnValue, 0,
|
||||
sizeof(AllowedDistributionColumn));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ extern bool InTopLevelDelegatedFunctionCall;
|
|||
extern bool InDelegatedProcedureCall;
|
||||
|
||||
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);
|
||||
extern void ResetAllowedShardKeyValue(void);
|
||||
extern void CheckAndResetAllowedShardKeyValueIfNeeded(void);
|
||||
extern bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId);
|
||||
|
||||
#endif /* FUNCTION_CALL_DELEGATION_H */
|
||||
|
|
|
@ -70,6 +70,9 @@ typedef struct AllowedDistributionColumn
|
|||
Const *distributionColumnValue;
|
||||
uint32 colocationId;
|
||||
bool isActive;
|
||||
|
||||
/* In nested executor, track the level at which value is set */
|
||||
int executorLevel;
|
||||
} AllowedDistributionColumn;
|
||||
|
||||
/*
|
||||
|
|
|
@ -1418,10 +1418,449 @@ NOTICE: INPUT 3
|
|||
CONTEXT: PL/pgSQL function test(integer) line XX at RAISE
|
||||
SQL statement "SELECT test(3)"
|
||||
PL/pgSQL function inline_code_block line XX at PERFORM
|
||||
CREATE TABLE testnested_table (x int, y int);
|
||||
SELECT create_distributed_table('testnested_table','x');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION inner_fn(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Non-force function calling force-delegation function
|
||||
CREATE OR REPLACE FUNCTION outer_local_fn()
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM 1 FROM inner_fn(1);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3);
|
||||
PERFORM 1 FROM inner_fn(4);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_distributed_function('inner_fn(int)','x',
|
||||
colocate_with:='testnested_table', force_delegation := true);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT outer_local_fn();
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)"
|
||||
PL/pgSQL function outer_local_fn() line XX at PERFORM
|
||||
DEBUG: pushing down the function call
|
||||
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)"
|
||||
PL/pgSQL function outer_local_fn() line XX at PERFORM
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)"
|
||||
PL/pgSQL function outer_local_fn() line XX at PERFORM
|
||||
DEBUG: pushing down the function call
|
||||
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(4)"
|
||||
PL/pgSQL function outer_local_fn() line XX at PERFORM
|
||||
outer_local_fn
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Rows from 1-6 should appear
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
2 | 3
|
||||
4 | 4
|
||||
5 | 6
|
||||
(4 rows)
|
||||
|
||||
BEGIN;
|
||||
SELECT outer_local_fn();
|
||||
outer_local_fn
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
1 | 1
|
||||
2 | 3
|
||||
2 | 3
|
||||
4 | 4
|
||||
4 | 4
|
||||
5 | 6
|
||||
5 | 6
|
||||
(8 rows)
|
||||
|
||||
DROP FUNCTION inner_fn(int);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
DROP FUNCTION outer_local_fn();
|
||||
TRUNCATE TABLE testnested_table;
|
||||
CREATE OR REPLACE FUNCTION inner_fn(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Force-delegation function calling non-force function
|
||||
CREATE OR REPLACE FUNCTION outer_fn(y int, z int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM 1 FROM forcepushdown_schema.inner_fn(y);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
PERFORM 1 FROM forcepushdown_schema.inner_fn(z);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_distributed_function('inner_fn(int)','x',
|
||||
colocate_with:='testnested_table', force_delegation := false);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('outer_fn(int, int)','y',
|
||||
colocate_with:='testnested_table', force_delegation := true);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT outer_fn(1, 2);
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)"
|
||||
PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement
|
||||
SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)"
|
||||
PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
BEGIN;
|
||||
SELECT outer_fn(1, 2);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x)"
|
||||
PL/pgSQL function forcepushdown_schema.inner_fn(integer) line XX at SQL statement
|
||||
SQL statement "SELECT 1 FROM forcepushdown_schema.inner_fn(z)"
|
||||
PL/pgSQL function forcepushdown_schema.outer_fn(integer,integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
-- No rows
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Force-delegation function calling force-delegation function
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_distributed_function(
|
||||
'force_push_outer(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_inner(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Keys 7,8,9,14 fall on one node and 15 on a different node
|
||||
-- Function gets delegated to node with shard-key = 7 and inner function
|
||||
-- will not be delegated but inserts shard-key = 8 locally
|
||||
SELECT force_push_outer(7);
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
|
||||
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
BEGIN;
|
||||
-- Function gets delegated to node with shard-key = 8 and inner function
|
||||
-- will not be delegated but inserts shard-key = 9 locally
|
||||
SELECT force_push_outer(8);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
|
||||
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
BEGIN;
|
||||
-- Function gets delegated to node with shard-key = 14 and inner function
|
||||
-- will not be delegated but fails to insert shard-key = 15 remotely
|
||||
SELECT force_push_outer(14);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_inner(integer) line XX at SQL statement
|
||||
SQL statement "SELECT forcepushdown_schema.force_push_inner(x+1) LIMIT 1"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
--
|
||||
-- Function-1() --> function-2() --> function-3()
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION force_push_1(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION force_push_2(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION force_push_3(z int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_distributed_function(
|
||||
'force_push_1(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_2(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_3(int)', 'z',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
TRUNCATE TABLE testnested_table;
|
||||
BEGIN;
|
||||
-- All local inserts
|
||||
SELECT force_push_1(7);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement
|
||||
SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
BEGIN;
|
||||
-- Local(shard-keys 13, 15) + remote insert (shard-key 14)
|
||||
SELECT force_push_1(13);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_2(integer) line XX at SQL statement
|
||||
SQL statement "SELECT forcepushdown_schema.force_push_2(x+1) LIMIT 1"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_1(integer) line XX at PERFORM
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
TRUNCATE TABLE testnested_table;
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM FROM forcepushdown_schema.force_push_inner(x);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
SELECT create_distributed_function(
|
||||
'force_push_inner(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_outer(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT force_push_outer(7);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: queries must filter by the distribution argument in the same colocation group when using the forced function pushdown
|
||||
HINT: consider disabling forced delegation through create_distributed_table(..., force_delegation := false)
|
||||
CONTEXT: SQL statement "INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1)"
|
||||
PL/pgSQL function forcepushdown_schema.force_push_outer(integer) line XX at SQL statement
|
||||
while executing command on localhost:xxxxx
|
||||
END;
|
||||
TABLE testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
RAISE NOTICE '%', y;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM FROM forcepushdown_schema.force_push_inner(x+1);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
|
||||
BEGIN;
|
||||
SELECT force_push_outer(9);
|
||||
DEBUG: pushing down function call in a multi-statement transaction
|
||||
DEBUG: pushing down the function call
|
||||
NOTICE: 10
|
||||
DETAIL: from localhost:xxxxx
|
||||
force_push_outer
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
TABLE testnested_table ORDER BY 1;
|
||||
x | y
|
||||
---------------------------------------------------------------------
|
||||
9 | 9
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SET citus.log_remote_commands TO off;
|
||||
DROP SCHEMA forcepushdown_schema CASCADE;
|
||||
NOTICE: drop cascades to 38 other objects
|
||||
NOTICE: drop cascades to 46 other objects
|
||||
DETAIL: drop cascades to table test_forcepushdown
|
||||
drop cascades to table test_forcepushdown_noncolocate
|
||||
drop cascades to function insert_data(integer)
|
||||
|
@ -1460,3 +1899,11 @@ drop cascades to function test_prepare(integer,integer)
|
|||
drop cascades to function outer_test_prepare(integer,integer)
|
||||
drop cascades to table test_perform
|
||||
drop cascades to function test(integer)
|
||||
drop cascades to table testnested_table
|
||||
drop cascades to function inner_fn(integer)
|
||||
drop cascades to function outer_fn(integer,integer)
|
||||
drop cascades to function force_push_inner(integer)
|
||||
drop cascades to function force_push_outer(integer)
|
||||
drop cascades to function force_push_1(integer)
|
||||
drop cascades to function force_push_2(integer)
|
||||
drop cascades to function force_push_3(integer)
|
||||
|
|
|
@ -683,6 +683,250 @@ BEGIN
|
|||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TABLE testnested_table (x int, y int);
|
||||
SELECT create_distributed_table('testnested_table','x');
|
||||
|
||||
CREATE OR REPLACE FUNCTION inner_fn(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Non-force function calling force-delegation function
|
||||
CREATE OR REPLACE FUNCTION outer_local_fn()
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM 1 FROM inner_fn(1);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (2,3);
|
||||
PERFORM 1 FROM inner_fn(4);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (5,6);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT create_distributed_function('inner_fn(int)','x',
|
||||
colocate_with:='testnested_table', force_delegation := true);
|
||||
|
||||
SELECT outer_local_fn();
|
||||
-- Rows from 1-6 should appear
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
|
||||
BEGIN;
|
||||
SELECT outer_local_fn();
|
||||
END;
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
|
||||
DROP FUNCTION inner_fn(int);
|
||||
DROP FUNCTION outer_local_fn();
|
||||
TRUNCATE TABLE testnested_table;
|
||||
|
||||
CREATE OR REPLACE FUNCTION inner_fn(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Force-delegation function calling non-force function
|
||||
CREATE OR REPLACE FUNCTION outer_fn(y int, z int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM 1 FROM forcepushdown_schema.inner_fn(y);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
PERFORM 1 FROM forcepushdown_schema.inner_fn(z);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT create_distributed_function('inner_fn(int)','x',
|
||||
colocate_with:='testnested_table', force_delegation := false);
|
||||
SELECT create_distributed_function('outer_fn(int, int)','y',
|
||||
colocate_with:='testnested_table', force_delegation := true);
|
||||
|
||||
SELECT outer_fn(1, 2);
|
||||
BEGIN;
|
||||
SELECT outer_fn(1, 2);
|
||||
END;
|
||||
|
||||
-- No rows
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
|
||||
-- Force-delegation function calling force-delegation function
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
PERFORM forcepushdown_schema.force_push_inner(x+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_outer(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
SELECT create_distributed_function(
|
||||
'force_push_inner(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
|
||||
-- Keys 7,8,9,14 fall on one node and 15 on a different node
|
||||
|
||||
-- Function gets delegated to node with shard-key = 7 and inner function
|
||||
-- will not be delegated but inserts shard-key = 8 locally
|
||||
SELECT force_push_outer(7);
|
||||
|
||||
BEGIN;
|
||||
-- Function gets delegated to node with shard-key = 8 and inner function
|
||||
-- will not be delegated but inserts shard-key = 9 locally
|
||||
SELECT force_push_outer(8);
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- Function gets delegated to node with shard-key = 14 and inner function
|
||||
-- will not be delegated but fails to insert shard-key = 15 remotely
|
||||
SELECT force_push_outer(14);
|
||||
END;
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
|
||||
--
|
||||
-- Function-1() --> function-2() --> function-3()
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION force_push_1(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
PERFORM forcepushdown_schema.force_push_2(x+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION force_push_2(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
PERFORM forcepushdown_schema.force_push_3(y+1) LIMIT 1;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION force_push_3(z int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (z,z);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'force_push_1(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
SELECT create_distributed_function(
|
||||
'force_push_2(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
SELECT create_distributed_function(
|
||||
'force_push_3(int)', 'z',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
|
||||
TRUNCATE TABLE testnested_table;
|
||||
BEGIN;
|
||||
-- All local inserts
|
||||
SELECT force_push_1(7);
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- Local(shard-keys 13, 15) + remote insert (shard-key 14)
|
||||
SELECT force_push_1(13);
|
||||
END;
|
||||
|
||||
SELECT * FROM testnested_table ORDER BY 1;
|
||||
|
||||
TRUNCATE TABLE testnested_table;
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (y,y);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM FROM forcepushdown_schema.force_push_inner(x);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x+1,x+1);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_distributed_function(
|
||||
'force_push_inner(int)', 'y',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
SELECT create_distributed_function(
|
||||
'force_push_outer(int)', 'x',
|
||||
colocate_with := 'testnested_table',
|
||||
force_delegation := true
|
||||
);
|
||||
|
||||
BEGIN;
|
||||
SELECT force_push_outer(7);
|
||||
END;
|
||||
TABLE testnested_table ORDER BY 1;
|
||||
|
||||
CREATE OR REPLACE FUNCTION force_push_inner(y int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
RAISE NOTICE '%', y;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION force_push_outer(x int)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM FROM forcepushdown_schema.force_push_inner(x+1);
|
||||
INSERT INTO forcepushdown_schema.testnested_table VALUES (x,x);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
BEGIN;
|
||||
SELECT force_push_outer(9);
|
||||
END;
|
||||
TABLE testnested_table ORDER BY 1;
|
||||
|
||||
RESET client_min_messages;
|
||||
SET citus.log_remote_commands TO off;
|
||||
DROP SCHEMA forcepushdown_schema CASCADE;
|
||||
|
|
Loading…
Reference in New Issue