Implement propagation of SET LOCAL commands

Adds support for propagation of SET LOCAL commands to all workers
involved in a query. For now, SET SESSION (i.e. plain SET) is not
supported whatsoever, though this code is intended as somewhat of a
base for implementing such support in the future.

As SET LOCAL modifications are scoped to the body of a BEGIN/END xact
block, queries wishing to use SET LOCAL propagation must be within such
a block. In addition, subsequent modifications after e.g. any SAVEPOINT
or ROLLBACK statements will correspondingly push or pop variable mod-
ifications onto an internal stack such that the behavior of changed
values across the cluster will be identical to such behavior on e.g.
single-node PostgreSQL (or equivalently, what values are visible to
the end user by running SHOW on such variables on the coordinator).

If nodes enter the set of participants at some point after SET LOCAL
modifications (or SAVEPOINT, ROLLBACK, etc.) have occurred, the SET
variable state is eagerly propagated to them upon their entrance (this
is identical to, and indeed just augments, the existing logic for the
propagation of the SAVEPOINT "stack").

A new GUC (citus.propagate_set_commands) has been added to control this
behavior. Though the code suggests the valid settings are 'none', 'local',
'session', and 'all', only 'none' (the default) and 'local' are presently
implemented: attempting to use other values will result in an error.
pull/2689/head
Jason Petersen 2019-04-25 12:16:34 -06:00
parent 1dec6c5163
commit d4e1172247
11 changed files with 502 additions and 16 deletions

View File

@ -56,6 +56,7 @@
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
static bool shouldInvalidateForeignKeyGraph = false;
static int activeAlterTables = 0;
@ -176,6 +177,24 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
#endif
/* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */
if (IsA(parsetree, VariableSetStmt))
{
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
bool propagateSetVar = (PropagateSetCommands == PROPSETCMD_LOCAL &&
setStmt->is_local);
bool setVarIsValid = SetCommandTargetIsValid(setStmt);
/* at present, we only implement the NONE and LOCAL behaviors */
AssertState(PropagateSetCommands == PROPSETCMD_NONE ||
PropagateSetCommands == PROPSETCMD_LOCAL);
if (propagateSetVar && setVarIsValid && IsMultiStatementTransaction())
{
ProcessVariableSetStmt(setStmt, queryString);
}
}
/*
* TRANSMIT used to be separate command, but to avoid patching the grammar
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the

View File

@ -0,0 +1,133 @@
/*-------------------------------------------------------------------------
*
* variableset.c
* Support for propagation of SET (commands to set variables)
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "common/string.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "lib/ilist.h"
#include "utils/varlena.h"
#include "distributed/remote_commands.h"
/*
* Checks whether a SET command modifies a variable which might violate assumptions
* made by Citus. Because we generally don't want a connection to be destroyed on error,
* and because we eagerly ensure the stack can be fully allocated (at backend startup),
* permitting changes to these two variables seems unwise. Also, ban propagating the
* SET command propagation setting (not for correctness, more to avoid confusion).
*/
bool
SetCommandTargetIsValid(VariableSetStmt *setStmt)
{
/* if this list grows considerably, switch to bsearch */
const char *blacklist[] = {
"citus.propagate_set_commands",
"client_encoding",
"exit_on_error",
"max_stack_depth"
};
Index idx = 0;
for (idx = 0; idx < lengthof(blacklist); idx++)
{
if (pg_strcasecmp(blacklist[idx], setStmt->name))
{
return true;
}
}
return false;
}
/*
* ProcessVariableSetStmt actually does the work of propagating a provided SET stmt
* to currently-participating worker nodes and adding the SET command test to a string
* keeping track of all propagated SET commands since (sub-)xact start.
*/
void
ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString)
{
dlist_iter iter;
const bool raiseInterrupts = true;
List *connectionList = NIL;
/* at present we only support SET LOCAL */
AssertArg(setStmt->is_local);
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */
if (activeSetStmts == NULL)
{
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
activeSetStmts = makeStringInfo();
MemoryContextSwitchTo(old_context);
}
/* send text of SET stmt to participating nodes... */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = NULL;
transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
if (!SendRemoteCommand(connection, setStmtString))
{
const bool raiseErrors = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
connectionList = lappend(connectionList, connection);
}
WaitForAllConnections(connectionList, raiseInterrupts);
/* ... and wait for the results */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = NULL;
const bool raiseErrors = true;
transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
ClearResults(connection, raiseErrors);
}
/* SET propagation successful: add to active SET stmt string */
appendStringInfoString(activeSetStmts, setStmtString);
/* ensure semicolon on end to allow appending future SET stmts */
if (!pg_str_endswith(setStmtString, ";"))
{
appendStringInfoChar(activeSetStmts, ';');
}
}

View File

@ -21,6 +21,7 @@
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
@ -59,6 +60,7 @@
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/varlena.h"
/* marks shared object as one loadable by the postgres version compiled against */
PG_MODULE_MAGIC;
@ -86,6 +88,13 @@ static int CitusSSLMode = 0;
/* *INDENT-OFF* */
/* GUC enum definitions */
static const struct config_enum_entry propagate_set_commands_options[] = {
{"none", PROPSETCMD_NONE, false},
{"local", PROPSETCMD_LOCAL, false},
{NULL, 0, false}
};
static const struct config_enum_entry task_assignment_policy_options[] = {
{ "greedy", TASK_ASSIGNMENT_GREEDY, false },
{ "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false },
@ -630,6 +639,17 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.propagate_set_commands",
gettext_noop("Sets which SET commands are propagated to workers."),
NULL,
&PropagateSetCommands,
PROPSETCMD_NONE,
propagate_set_commands_options,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_router_execution",
gettext_noop("Enables router execution"),

View File

@ -93,16 +93,31 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
distributedTransactionId->transactionNumber,
timestamp);
/* append in-progress savepoints for this transaction */
activeSubXacts = ActiveSubXacts();
/* append context for in-progress SAVEPOINTs for this transaction */
activeSubXacts = ActiveSubXactContexts();
transaction->lastSuccessfulSubXact = TopSubTransactionId;
transaction->lastQueuedSubXact = TopSubTransactionId;
foreach(subIdCell, activeSubXacts)
{
SubTransactionId subId = lfirst_int(subIdCell);
SubXactContext *subXactState = lfirst(subIdCell);
/* append SET LOCAL state from when SAVEPOINT was encountered... */
if (subXactState->setLocalCmds != NULL)
{
appendStringInfoString(beginAndSetDistributedTransactionId,
subXactState->setLocalCmds->data);
}
/* ... then append SAVEPOINT to enter this subxact */
appendStringInfo(beginAndSetDistributedTransactionId,
"SAVEPOINT savepoint_%u;", subId);
transaction->lastQueuedSubXact = subId;
"SAVEPOINT savepoint_%u;", subXactState->subId);
transaction->lastQueuedSubXact = subXactState->subId;
}
/* we've pushed into deepest subxact: apply in-progress SET context */
if (activeSetStmts != NULL)
{
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
}
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))

View File

@ -46,8 +46,20 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
/* list of connections that are part of the current coordinated transaction */
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
/* stack of active sub-transactions */
static List *activeSubXacts = NIL;
/*
* activeSetStmts keeps track of SET LOCAL statements executed within the current
* subxact and will be set to NULL when pushing into new subxact or ending top xact.
*/
StringInfo activeSetStmts;
/*
* Though a list, we treat this as a stack, pushing on subxact contexts whenever
* e.g. a SAVEPOINT is executed (though this is actually performed by providing
* PostgreSQL with a sub-xact callback). At present, the context of a subxact
* includes a subxact identifier as well as any SET LOCAL statements propagated
* to workers during the sub-transaction.
*/
static List *activeSubXactContexts = NIL;
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
MemoryContext CommitContext = NULL;
@ -213,6 +225,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
UnSetDistributedTransactionId();
@ -265,6 +278,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
FunctionCallLevel = 0;
@ -484,7 +498,16 @@ static void
PushSubXact(SubTransactionId subId)
{
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
activeSubXacts = lcons_int(subId, activeSubXacts);
/* save provided subId as well as propagated SET LOCAL stmts */
SubXactContext *state = palloc(sizeof(SubXactContext));
state->subId = subId;
state->setLocalCmds = activeSetStmts;
/* append to list and reset active set stmts for upcoming sub-xact */
activeSubXactContexts = lcons(state, activeSubXactContexts);
activeSetStmts = makeStringInfo();
MemoryContextSwitchTo(old_context);
}
@ -494,8 +517,17 @@ static void
PopSubXact(SubTransactionId subId)
{
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
Assert(linitial_int(activeSubXacts) == subId);
activeSubXacts = list_delete_first(activeSubXacts);
SubXactContext *state = linitial(activeSubXactContexts);
/*
* the previous activeSetStmts is already invalid because it's in the now-
* aborted subxact (what we're popping), so no need to free before assign-
* ing with the setLocalCmds of the popped context
*/
Assert(state->subId == subId);
activeSetStmts = state->setLocalCmds;
activeSubXactContexts = list_delete_first(activeSubXactContexts);
MemoryContextSwitchTo(old_context);
}
@ -504,23 +536,44 @@ PopSubXact(SubTransactionId subId)
List *
ActiveSubXacts(void)
{
ListCell *subIdCell = NULL;
ListCell *subXactCell = NULL;
List *activeSubXactsReversed = NIL;
/*
* activeSubXacts is in reversed temporal order, so we reverse it to get it
* activeSubXactContexts is in reversed temporal order, so we reverse it to get it
* in temporal order.
*/
foreach(subIdCell, activeSubXacts)
foreach(subXactCell, activeSubXactContexts)
{
SubTransactionId subId = lfirst_int(subIdCell);
activeSubXactsReversed = lcons_int(subId, activeSubXactsReversed);
SubXactContext *state = lfirst(subXactCell);
activeSubXactsReversed = lcons_int(state->subId, activeSubXactsReversed);
}
return activeSubXactsReversed;
}
/* ActiveSubXactContexts returns the list of active sub-xact context in temporal order. */
List *
ActiveSubXactContexts(void)
{
ListCell *subXactCell = NULL;
List *reversedSubXactStates = NIL;
/*
* activeSubXactContexts is in reversed temporal order, so we reverse it to get it
* in temporal order.
*/
foreach(subXactCell, activeSubXactContexts)
{
SubXactContext *state = lfirst(subXactCell);
reversedSubXactStates = lcons(state, reversedSubXactStates);
}
return reversedSubXactStates;
}
/*
* If an ERROR is thrown while processing a transaction the ABORT handler is called.
* ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also

View File

@ -123,5 +123,7 @@ extern void ProcessTruncateStatement(TruncateStmt *truncateStatement);
/* vacuum.c - froward declarations */
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
extern bool SetCommandTargetIsValid(VariableSetStmt *setStmt);
extern void ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
#endif /*CITUS_COMMANDS_H */

View File

@ -16,9 +16,17 @@
#include "utils/relcache.h"
#include "tcop/utility.h"
typedef enum
{
PROPSETCMD_INVALID = -1,
PROPSETCMD_NONE, /* do not propagate SET commands */
PROPSETCMD_LOCAL, /* propagate SET LOCAL commands */
PROPSETCMD_SESSION, /* propagate SET commands, but not SET LOCAL ones */
PROPSETCMD_ALL /* propagate all SET commands */
} PropSetCmdBehavior;
extern PropSetCmdBehavior PropagateSetCommands;
extern bool EnableDDLPropagation;
/*
* A DDLJob encapsulates the remote tasks and commands needed to process all or
* part of a distributed DDL command. It hold the distributed relation's oid,

View File

@ -10,6 +10,7 @@
#define TRANSACTION_MANAGMENT_H
#include "lib/ilist.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/* describes what kind of modifications have occurred in the current transaction */
@ -53,6 +54,13 @@ typedef enum
COMMIT_PROTOCOL_2PC = 2
} CommitProtocolType;
/* Enumeration to keep track of context within nested sub-transactions */
typedef struct SubXactContext
{
SubTransactionId subId;
StringInfo setLocalCmds;
} SubXactContext;
/*
* GUC that determines whether a SELECT in a transaction block should also run in
* a transaction block on the worker.
@ -85,6 +93,8 @@ extern int StoredProcedureLevel;
/* number of nested function call levels we are currently in */
extern int FunctionCallLevel;
/* SET LOCAL statements active in the current (sub-)transaction. */
extern StringInfo activeSetStmts;
/*
* Coordinated transaction management.
@ -99,6 +109,7 @@ extern void InitializeTransactionManagement(void);
/* other functions */
extern List * ActiveSubXacts(void);
extern List * ActiveSubXactContexts(void);
#endif /* TRANSACTION_MANAGMENT_H */

View File

@ -341,6 +341,92 @@ BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ERROR: canceling the transaction since it was involved in a distributed deadlock
ROLLBACK;
-- test propagation of SET LOCAL
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
CREATE USER rls_user;
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
SELECT run_command_on_workers('CREATE USER rls_user');
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
run_command_on_workers
---------------------------
(localhost,57637,t,GRANT)
(localhost,57638,t,GRANT)
(2 rows)
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
run_command_on_workers
---------------------------
(localhost,57637,t,GRANT)
(localhost,57638,t,GRANT)
(2 rows)
-- create trigger on one worker to reject access if GUC not
\c - - - :worker_1_port
SET search_path = 'multi_real_time_transaction';
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
\c - - - :master_port
SET ROLE rls_user;
SET search_path = 'multi_real_time_transaction';
-- shouldn't see all rows because of RLS
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
BEGIN;
-- without enabling SET LOCAL prop, still won't work
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
SET LOCAL citus.propagate_set_commands TO 'local';
-- now we should be good to go
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
SAVEPOINT disable_rls;
SET LOCAL app.show_rows TO FALSE;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
ROLLBACK TO SAVEPOINT disable_rls;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
SAVEPOINT disable_rls_for_real;
SET LOCAL app.show_rows TO FALSE;
RELEASE SAVEPOINT disable_rls_for_real;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
COMMIT;
RESET ROLE;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;

View File

@ -349,6 +349,92 @@ BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ERROR: canceling the transaction since it was involved in a distributed deadlock
ROLLBACK;
-- test propagation of SET LOCAL
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
CREATE USER rls_user;
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
SELECT run_command_on_workers('CREATE USER rls_user');
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
run_command_on_workers
---------------------------
(localhost,57637,t,GRANT)
(localhost,57638,t,GRANT)
(2 rows)
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
run_command_on_workers
---------------------------
(localhost,57637,t,GRANT)
(localhost,57638,t,GRANT)
(2 rows)
-- create trigger on one worker to reject access if GUC not
\c - - - :worker_1_port
SET search_path = 'multi_real_time_transaction';
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
\c - - - :master_port
SET ROLE rls_user;
SET search_path = 'multi_real_time_transaction';
-- shouldn't see all rows because of RLS
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
BEGIN;
-- without enabling SET LOCAL prop, still won't work
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
SET LOCAL citus.propagate_set_commands TO 'local';
-- now we should be good to go
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
SAVEPOINT disable_rls;
SET LOCAL app.show_rows TO FALSE;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
ROLLBACK TO SAVEPOINT disable_rls;
SELECT COUNT(*) FROM test_table;
count
-------
6
(1 row)
SAVEPOINT disable_rls_for_real;
SET LOCAL app.show_rows TO FALSE;
RELEASE SAVEPOINT disable_rls_for_real;
SELECT COUNT(*) FROM test_table;
count
-------
4
(1 row)
COMMIT;
RESET ROLE;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;

View File

@ -213,6 +213,59 @@ BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ROLLBACK;
-- test propagation of SET LOCAL
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
CREATE USER rls_user;
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
SELECT run_command_on_workers('CREATE USER rls_user');
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
-- create trigger on one worker to reject access if GUC not
\c - - - :worker_1_port
SET search_path = 'multi_real_time_transaction';
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
\c - - - :master_port
SET ROLE rls_user;
SET search_path = 'multi_real_time_transaction';
-- shouldn't see all rows because of RLS
SELECT COUNT(*) FROM test_table;
BEGIN;
-- without enabling SET LOCAL prop, still won't work
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
SET LOCAL citus.propagate_set_commands TO 'local';
-- now we should be good to go
SET LOCAL app.show_rows TO TRUE;
SELECT COUNT(*) FROM test_table;
SAVEPOINT disable_rls;
SET LOCAL app.show_rows TO FALSE;
SELECT COUNT(*) FROM test_table;
ROLLBACK TO SAVEPOINT disable_rls;
SELECT COUNT(*) FROM test_table;
SAVEPOINT disable_rls_for_real;
SET LOCAL app.show_rows TO FALSE;
RELEASE SAVEPOINT disable_rls_for_real;
SELECT COUNT(*) FROM test_table;
COMMIT;
RESET ROLE;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;