mirror of https://github.com/citusdata/citus.git
Fix RESET and other types of SET
parent
8617838fd6
commit
07d2266e11
|
@ -181,15 +181,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
if (IsA(parsetree, VariableSetStmt))
|
||||
{
|
||||
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
|
||||
bool propagateSetVar = (PropagateSetCommands == PROPSETCMD_LOCAL &&
|
||||
setStmt->is_local);
|
||||
bool setVarIsValid = SetCommandTargetIsValid(setStmt);
|
||||
|
||||
/* at present, we only implement the NONE and LOCAL behaviors */
|
||||
AssertState(PropagateSetCommands == PROPSETCMD_NONE ||
|
||||
PropagateSetCommands == PROPSETCMD_LOCAL);
|
||||
|
||||
if (propagateSetVar && setVarIsValid && IsMultiStatementTransaction())
|
||||
if (IsMultiStatementTransaction() && ShouldPropagateSetCommand(setStmt))
|
||||
{
|
||||
ProcessVariableSetStmt(setStmt, queryString);
|
||||
}
|
||||
|
|
|
@ -27,34 +27,87 @@
|
|||
#include "distributed/remote_commands.h"
|
||||
|
||||
|
||||
static bool IsSettingSafeToPropagate(char *name);
|
||||
|
||||
|
||||
/*
|
||||
* Checks whether a SET command modifies a variable which might violate assumptions
|
||||
* made by Citus. Because we generally don't want a connection to be destroyed on error,
|
||||
* and because we eagerly ensure the stack can be fully allocated (at backend startup),
|
||||
* permitting changes to these two variables seems unwise. Also, ban propagating the
|
||||
* SET command propagation setting (not for correctness, more to avoid confusion).
|
||||
* ShouldPropagateSetCommand determines whether a SET or RESET command should be
|
||||
* propagated to the workers.
|
||||
*
|
||||
* We currently propagate:
|
||||
* - SET LOCAL (for allowed settings)
|
||||
* - RESET (for allowed settings)
|
||||
* - RESET ALL
|
||||
*/
|
||||
bool
|
||||
SetCommandTargetIsValid(VariableSetStmt *setStmt)
|
||||
ShouldPropagateSetCommand(VariableSetStmt *setStmt)
|
||||
{
|
||||
/* if this list grows considerably, switch to bsearch */
|
||||
const char *blacklist[] = {
|
||||
if (PropagateSetCommands != PROPSETCMD_LOCAL)
|
||||
{
|
||||
/* SET propagation is disabled */
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (setStmt->kind)
|
||||
{
|
||||
case VAR_SET_VALUE:
|
||||
case VAR_SET_CURRENT:
|
||||
case VAR_SET_DEFAULT:
|
||||
{
|
||||
/* SET LOCAL on a safe setting */
|
||||
return setStmt->is_local && IsSettingSafeToPropagate(setStmt->name);
|
||||
}
|
||||
|
||||
case VAR_RESET:
|
||||
{
|
||||
/* may need to reset prior SET LOCAL */
|
||||
return IsSettingSafeToPropagate(setStmt->name);
|
||||
}
|
||||
|
||||
case VAR_RESET_ALL:
|
||||
{
|
||||
/* always propagate RESET ALL since it might affect prior SET LOCALs */
|
||||
return true;
|
||||
}
|
||||
|
||||
case VAR_SET_MULTI:
|
||||
default:
|
||||
{
|
||||
/* SET (LOCAL) TRANSACTION should be handled locally */
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSettingSafeToPropagate returns whether a SET LOCAL is safe to propagate.
|
||||
*
|
||||
* We exclude settings that are highly specific to the client or session and also ban
|
||||
* ban propagating the SET command propagation setting (not for correctness, more to
|
||||
* avoid confusion).
|
||||
*/
|
||||
static bool
|
||||
IsSettingSafeToPropagate(char *name)
|
||||
{
|
||||
/* if this list grows considerably we should switch to bsearch */
|
||||
const char *skipSettings[] = {
|
||||
"citus.propagate_set_commands",
|
||||
"client_encoding",
|
||||
"exit_on_error",
|
||||
"max_stack_depth"
|
||||
};
|
||||
Index idx = 0;
|
||||
Index settingIndex = 0;
|
||||
|
||||
for (idx = 0; idx < lengthof(blacklist); idx++)
|
||||
for (settingIndex = 0; settingIndex < lengthof(skipSettings); settingIndex++)
|
||||
{
|
||||
if (pg_strcasecmp(blacklist[idx], setStmt->name))
|
||||
if (pg_strcasecmp(skipSettings[settingIndex], name) == 0)
|
||||
{
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ extern void ProcessTruncateStatement(TruncateStmt *truncateStatement);
|
|||
/* vacuum.c - froward declarations */
|
||||
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
|
||||
extern bool SetCommandTargetIsValid(VariableSetStmt *setStmt);
|
||||
extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
|
||||
extern void ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
|
||||
|
||||
#endif /*CITUS_COMMANDS_H */
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
CREATE SCHEMA propagate_set_commands;
|
||||
SET search_path TO propagate_set_commands;
|
||||
CREATE TABLE test (id int, value int);
|
||||
SELECT create_distributed_table('test', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test VALUES (1,1), (3,3);
|
||||
-- test set local propagation
|
||||
SET citus.propagate_set_commands TO 'local';
|
||||
-- make sure we send BEGIN before a SELECT
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.select_opens_transaction_block TO on;
|
||||
BEGIN;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
-- should not be propagated, error should be coming from coordinator
|
||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query
|
||||
END;
|
||||
BEGIN;
|
||||
-- set session commands are not propagated
|
||||
SET enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
BEGIN;
|
||||
-- should not propagate exit_on_error
|
||||
SET LOCAL exit_on_error TO on;
|
||||
SELECT current_setting('exit_on_error') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- expand to new node, set should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- should be back on after set to default
|
||||
SET LOCAL enable_hashagg TO DEFAULT;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
-- expand to new node, set to default should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- does not have the LOCAL keyword, not propagated
|
||||
SET enable_hashagg TO DEFAULT;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- should be back on after reset
|
||||
RESET enable_hashagg;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
-- expand to new node, reset should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
-- should be back on after reset all
|
||||
RESET ALL;
|
||||
SET search_path = 'propagate_set_commands';
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
-- funky case, we reset citus.propagate_set_commands, so not set again
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
current_setting
|
||||
-----------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
DROP SCHEMA propagate_set_commands CASCADE;
|
||||
NOTICE: drop cascades to table test
|
|
@ -66,7 +66,7 @@ test: multi_explain
|
|||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||
test: sql_procedure multi_function_in_join
|
||||
test: multi_subquery_in_where_reference_clause full_join adaptive_executor
|
||||
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
|
||||
test: multi_reference_table multi_select_for_update relation_access_tracking
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
CREATE SCHEMA propagate_set_commands;
|
||||
SET search_path TO propagate_set_commands;
|
||||
|
||||
CREATE TABLE test (id int, value int);
|
||||
SELECT create_distributed_table('test', 'id');
|
||||
INSERT INTO test VALUES (1,1), (3,3);
|
||||
|
||||
-- test set local propagation
|
||||
SET citus.propagate_set_commands TO 'local';
|
||||
|
||||
-- make sure we send BEGIN before a SELECT
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.select_opens_transaction_block TO on;
|
||||
|
||||
BEGIN;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
-- should not be propagated, error should be coming from coordinator
|
||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- set session commands are not propagated
|
||||
SET enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
ABORT;
|
||||
|
||||
BEGIN;
|
||||
-- should not propagate exit_on_error
|
||||
SET LOCAL exit_on_error TO on;
|
||||
SELECT current_setting('exit_on_error') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- expand to new node, set should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- should be back on after set to default
|
||||
SET LOCAL enable_hashagg TO DEFAULT;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- expand to new node, set to default should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- does not have the LOCAL keyword, not propagated
|
||||
SET enable_hashagg TO DEFAULT;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- should be back on after reset
|
||||
RESET enable_hashagg;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- expand to new node, reset should still apply
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- should be off on worker
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- should be back on after reset all
|
||||
RESET ALL;
|
||||
SET search_path = 'propagate_set_commands';
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
|
||||
-- funky case, we reset citus.propagate_set_commands, so not set again
|
||||
SET LOCAL enable_hashagg TO false;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 3;
|
||||
ABORT;
|
||||
|
||||
DROP SCHEMA propagate_set_commands CASCADE;
|
Loading…
Reference in New Issue