diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index d30c1d8a1..4db633c3e 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -56,6 +56,7 @@ bool EnableDDLPropagation = true; /* ddl propagation is enabled */ +PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; static int activeAlterTables = 0; @@ -176,6 +177,24 @@ multi_ProcessUtility(PlannedStmt *pstmt, } #endif + /* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */ + if (IsA(parsetree, VariableSetStmt)) + { + VariableSetStmt *setStmt = (VariableSetStmt *) parsetree; + bool propagateSetVar = (PropagateSetCommands == PROPSETCMD_LOCAL && + setStmt->is_local); + bool setVarIsValid = SetCommandTargetIsValid(setStmt); + + /* at present, we only implement the NONE and LOCAL behaviors */ + AssertState(PropagateSetCommands == PROPSETCMD_NONE || + PropagateSetCommands == PROPSETCMD_LOCAL); + + if (propagateSetVar && setVarIsValid && IsMultiStatementTransaction()) + { + ProcessVariableSetStmt(setStmt, queryString); + } + } + /* * TRANSMIT used to be separate command, but to avoid patching the grammar * it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the diff --git a/src/backend/distributed/commands/variableset.c b/src/backend/distributed/commands/variableset.c new file mode 100644 index 000000000..30e65b8f2 --- /dev/null +++ b/src/backend/distributed/commands/variableset.c @@ -0,0 +1,133 @@ +/*------------------------------------------------------------------------- + * + * variableset.c + * Support for propagation of SET (commands to set variables) + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" + +#include "common/string.h" +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_router_executor.h" +#include "distributed/resource_lock.h" +#include "distributed/transaction_management.h" +#include "distributed/version_compat.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "lib/ilist.h" +#include "utils/varlena.h" +#include "distributed/remote_commands.h" + + +/* + * Checks whether a SET command modifies a variable which might violate assumptions + * made by Citus. Because we generally don't want a connection to be destroyed on error, + * and because we eagerly ensure the stack can be fully allocated (at backend startup), + * permitting changes to these two variables seems unwise. Also, ban propagating the + * SET command propagation setting (not for correctness, more to avoid confusion). + */ +bool +SetCommandTargetIsValid(VariableSetStmt *setStmt) +{ + /* if this list grows considerably, switch to bsearch */ + const char *blacklist[] = { + "citus.propagate_set_commands", + "client_encoding", + "exit_on_error", + "max_stack_depth" + }; + Index idx = 0; + + for (idx = 0; idx < lengthof(blacklist); idx++) + { + if (pg_strcasecmp(blacklist[idx], setStmt->name)) + { + return true; + } + } + + return false; +} + + +/* + * ProcessVariableSetStmt actually does the work of propagating a provided SET stmt + * to currently-participating worker nodes and adding the SET command test to a string + * keeping track of all propagated SET commands since (sub-)xact start. + */ +void +ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString) +{ + dlist_iter iter; + const bool raiseInterrupts = true; + List *connectionList = NIL; + + /* at present we only support SET LOCAL */ + AssertArg(setStmt->is_local); + + /* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */ + if (activeSetStmts == NULL) + { + MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext); + activeSetStmts = makeStringInfo(); + MemoryContextSwitchTo(old_context); + } + + /* send text of SET stmt to participating nodes... */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = NULL; + + transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + if (!SendRemoteCommand(connection, setStmtString)) + { + const bool raiseErrors = true; + HandleRemoteTransactionConnectionError(connection, raiseErrors); + } + + connectionList = lappend(connectionList, connection); + } + + WaitForAllConnections(connectionList, raiseInterrupts); + + /* ... and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = NULL; + const bool raiseErrors = true; + + transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + ClearResults(connection, raiseErrors); + } + + /* SET propagation successful: add to active SET stmt string */ + appendStringInfoString(activeSetStmts, setStmtString); + + /* ensure semicolon on end to allow appending future SET stmts */ + if (!pg_str_endswith(setStmtString, ";")) + { + appendStringInfoChar(activeSetStmts, ';'); + } +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index adddc90a8..6856cd06c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -21,6 +21,7 @@ #include "executor/executor.h" #include "distributed/backend_data.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/commands.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" @@ -59,6 +60,7 @@ #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/varlena.h" /* marks shared object as one loadable by the postgres version compiled against */ PG_MODULE_MAGIC; @@ -86,6 +88,13 @@ static int CitusSSLMode = 0; /* *INDENT-OFF* */ /* GUC enum definitions */ +static const struct config_enum_entry propagate_set_commands_options[] = { + {"none", PROPSETCMD_NONE, false}, + {"local", PROPSETCMD_LOCAL, false}, + {NULL, 0, false} +}; + + static const struct config_enum_entry task_assignment_policy_options[] = { { "greedy", TASK_ASSIGNMENT_GREEDY, false }, { "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false }, @@ -630,6 +639,17 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.propagate_set_commands", + gettext_noop("Sets which SET commands are propagated to workers."), + NULL, + &PropagateSetCommands, + PROPSETCMD_NONE, + propagate_set_commands_options, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_router_execution", gettext_noop("Enables router execution"), diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 5df7a7ad8..44e924a14 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -93,16 +93,31 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) distributedTransactionId->transactionNumber, timestamp); - /* append in-progress savepoints for this transaction */ - activeSubXacts = ActiveSubXacts(); + /* append context for in-progress SAVEPOINTs for this transaction */ + activeSubXacts = ActiveSubXactContexts(); transaction->lastSuccessfulSubXact = TopSubTransactionId; transaction->lastQueuedSubXact = TopSubTransactionId; foreach(subIdCell, activeSubXacts) { - SubTransactionId subId = lfirst_int(subIdCell); + SubXactContext *subXactState = lfirst(subIdCell); + + /* append SET LOCAL state from when SAVEPOINT was encountered... */ + if (subXactState->setLocalCmds != NULL) + { + appendStringInfoString(beginAndSetDistributedTransactionId, + subXactState->setLocalCmds->data); + } + + /* ... then append SAVEPOINT to enter this subxact */ appendStringInfo(beginAndSetDistributedTransactionId, - "SAVEPOINT savepoint_%u;", subId); - transaction->lastQueuedSubXact = subId; + "SAVEPOINT savepoint_%u;", subXactState->subId); + transaction->lastQueuedSubXact = subXactState->subId; + } + + /* we've pushed into deepest subxact: apply in-progress SET context */ + if (activeSetStmts != NULL) + { + appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data); } if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e28dee516..985e0b2f7 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -46,8 +46,20 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); -/* stack of active sub-transactions */ -static List *activeSubXacts = NIL; +/* + * activeSetStmts keeps track of SET LOCAL statements executed within the current + * subxact and will be set to NULL when pushing into new subxact or ending top xact. + */ +StringInfo activeSetStmts; + +/* + * Though a list, we treat this as a stack, pushing on subxact contexts whenever + * e.g. a SAVEPOINT is executed (though this is actually performed by providing + * PostgreSQL with a sub-xact callback). At present, the context of a subxact + * includes a subxact identifier as well as any SET LOCAL statements propagated + * to workers during the sub-transaction. + */ +static List *activeSubXactContexts = NIL; /* some pre-allocated memory so we don't need to call malloc() during callbacks */ MemoryContext CommitContext = NULL; @@ -213,6 +225,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); + activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; UnSetDistributedTransactionId(); @@ -265,6 +278,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); + activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; FunctionCallLevel = 0; @@ -484,7 +498,16 @@ static void PushSubXact(SubTransactionId subId) { MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext); - activeSubXacts = lcons_int(subId, activeSubXacts); + + /* save provided subId as well as propagated SET LOCAL stmts */ + SubXactContext *state = palloc(sizeof(SubXactContext)); + state->subId = subId; + state->setLocalCmds = activeSetStmts; + + /* append to list and reset active set stmts for upcoming sub-xact */ + activeSubXactContexts = lcons(state, activeSubXactContexts); + activeSetStmts = makeStringInfo(); + MemoryContextSwitchTo(old_context); } @@ -494,8 +517,17 @@ static void PopSubXact(SubTransactionId subId) { MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext); - Assert(linitial_int(activeSubXacts) == subId); - activeSubXacts = list_delete_first(activeSubXacts); + SubXactContext *state = linitial(activeSubXactContexts); + + /* + * the previous activeSetStmts is already invalid because it's in the now- + * aborted subxact (what we're popping), so no need to free before assign- + * ing with the setLocalCmds of the popped context + */ + Assert(state->subId == subId); + activeSetStmts = state->setLocalCmds; + activeSubXactContexts = list_delete_first(activeSubXactContexts); + MemoryContextSwitchTo(old_context); } @@ -504,23 +536,44 @@ PopSubXact(SubTransactionId subId) List * ActiveSubXacts(void) { - ListCell *subIdCell = NULL; + ListCell *subXactCell = NULL; List *activeSubXactsReversed = NIL; /* - * activeSubXacts is in reversed temporal order, so we reverse it to get it + * activeSubXactContexts is in reversed temporal order, so we reverse it to get it * in temporal order. */ - foreach(subIdCell, activeSubXacts) + foreach(subXactCell, activeSubXactContexts) { - SubTransactionId subId = lfirst_int(subIdCell); - activeSubXactsReversed = lcons_int(subId, activeSubXactsReversed); + SubXactContext *state = lfirst(subXactCell); + activeSubXactsReversed = lcons_int(state->subId, activeSubXactsReversed); } return activeSubXactsReversed; } +/* ActiveSubXactContexts returns the list of active sub-xact context in temporal order. */ +List * +ActiveSubXactContexts(void) +{ + ListCell *subXactCell = NULL; + List *reversedSubXactStates = NIL; + + /* + * activeSubXactContexts is in reversed temporal order, so we reverse it to get it + * in temporal order. + */ + foreach(subXactCell, activeSubXactContexts) + { + SubXactContext *state = lfirst(subXactCell); + reversedSubXactStates = lcons(state, reversedSubXactStates); + } + + return reversedSubXactStates; +} + + /* * If an ERROR is thrown while processing a transaction the ABORT handler is called. * ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 74b4bc0e6..a4da1b49f 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -123,5 +123,7 @@ extern void ProcessTruncateStatement(TruncateStmt *truncateStatement); /* vacuum.c - froward declarations */ extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); +extern bool SetCommandTargetIsValid(VariableSetStmt *setStmt); +extern void ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand); #endif /*CITUS_COMMANDS_H */ diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 0d115f37b..64792f3a1 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -16,9 +16,17 @@ #include "utils/relcache.h" #include "tcop/utility.h" +typedef enum +{ + PROPSETCMD_INVALID = -1, + PROPSETCMD_NONE, /* do not propagate SET commands */ + PROPSETCMD_LOCAL, /* propagate SET LOCAL commands */ + PROPSETCMD_SESSION, /* propagate SET commands, but not SET LOCAL ones */ + PROPSETCMD_ALL /* propagate all SET commands */ +} PropSetCmdBehavior; +extern PropSetCmdBehavior PropagateSetCommands; extern bool EnableDDLPropagation; - /* * A DDLJob encapsulates the remote tasks and commands needed to process all or * part of a distributed DDL command. It hold the distributed relation's oid, diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 96e49f4a3..414fff800 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -10,6 +10,7 @@ #define TRANSACTION_MANAGMENT_H #include "lib/ilist.h" +#include "lib/stringinfo.h" #include "nodes/pg_list.h" /* describes what kind of modifications have occurred in the current transaction */ @@ -53,6 +54,13 @@ typedef enum COMMIT_PROTOCOL_2PC = 2 } CommitProtocolType; +/* Enumeration to keep track of context within nested sub-transactions */ +typedef struct SubXactContext +{ + SubTransactionId subId; + StringInfo setLocalCmds; +} SubXactContext; + /* * GUC that determines whether a SELECT in a transaction block should also run in * a transaction block on the worker. @@ -85,6 +93,8 @@ extern int StoredProcedureLevel; /* number of nested function call levels we are currently in */ extern int FunctionCallLevel; +/* SET LOCAL statements active in the current (sub-)transaction. */ +extern StringInfo activeSetStmts; /* * Coordinated transaction management. @@ -99,6 +109,7 @@ extern void InitializeTransactionManagement(void); /* other functions */ extern List * ActiveSubXacts(void); +extern List * ActiveSubXactContexts(void); #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 5d317e559..5bd561a2d 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -341,6 +341,92 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; +-- test propagation of SET LOCAL +-- gonna need a non-superuser as we'll use RLS to test GUC propagation +CREATE USER rls_user; +GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user; +GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user; +SELECT run_command_on_workers('CREATE USER rls_user'); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user'); + run_command_on_workers +--------------------------- + (localhost,57637,t,GRANT) + (localhost,57638,t,GRANT) +(2 rows) + +SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user'); + run_command_on_workers +--------------------------- + (localhost,57637,t,GRANT) + (localhost,57638,t,GRANT) +(2 rows) + +-- create trigger on one worker to reject access if GUC not +\c - - - :worker_1_port +SET search_path = 'multi_real_time_transaction'; +ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY; +CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC + USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE)); +\c - - - :master_port +SET ROLE rls_user; +SET search_path = 'multi_real_time_transaction'; +-- shouldn't see all rows because of RLS +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +BEGIN; +-- without enabling SET LOCAL prop, still won't work +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +SET LOCAL citus.propagate_set_commands TO 'local'; +-- now we should be good to go +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +SAVEPOINT disable_rls; +SET LOCAL app.show_rows TO FALSE; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +ROLLBACK TO SAVEPOINT disable_rls; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +SAVEPOINT disable_rls_for_real; +SET LOCAL app.show_rows TO FALSE; +RELEASE SAVEPOINT disable_rls_for_real; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +COMMIT; +RESET ROLE; -- sequential real-time queries should be successfully executed -- since the queries are sent over the same connection BEGIN; diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index 5dbe2a9b4..ab076d91f 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -349,6 +349,92 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock ROLLBACK; +-- test propagation of SET LOCAL +-- gonna need a non-superuser as we'll use RLS to test GUC propagation +CREATE USER rls_user; +GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user; +GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user; +SELECT run_command_on_workers('CREATE USER rls_user'); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user'); + run_command_on_workers +--------------------------- + (localhost,57637,t,GRANT) + (localhost,57638,t,GRANT) +(2 rows) + +SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user'); + run_command_on_workers +--------------------------- + (localhost,57637,t,GRANT) + (localhost,57638,t,GRANT) +(2 rows) + +-- create trigger on one worker to reject access if GUC not +\c - - - :worker_1_port +SET search_path = 'multi_real_time_transaction'; +ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY; +CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC + USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE)); +\c - - - :master_port +SET ROLE rls_user; +SET search_path = 'multi_real_time_transaction'; +-- shouldn't see all rows because of RLS +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +BEGIN; +-- without enabling SET LOCAL prop, still won't work +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +SET LOCAL citus.propagate_set_commands TO 'local'; +-- now we should be good to go +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +SAVEPOINT disable_rls; +SET LOCAL app.show_rows TO FALSE; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +ROLLBACK TO SAVEPOINT disable_rls; +SELECT COUNT(*) FROM test_table; + count +------- + 6 +(1 row) + +SAVEPOINT disable_rls_for_real; +SET LOCAL app.show_rows TO FALSE; +RELEASE SAVEPOINT disable_rls_for_real; +SELECT COUNT(*) FROM test_table; + count +------- + 4 +(1 row) + +COMMIT; +RESET ROLE; -- sequential real-time queries should be successfully executed -- since the queries are sent over the same connection BEGIN; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index d59bc83f4..742142dfc 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -213,6 +213,59 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ROLLBACK; +-- test propagation of SET LOCAL +-- gonna need a non-superuser as we'll use RLS to test GUC propagation +CREATE USER rls_user; +GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user; +GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user; + +SELECT run_command_on_workers('CREATE USER rls_user'); +SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user'); +SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user'); + +-- create trigger on one worker to reject access if GUC not +\c - - - :worker_1_port +SET search_path = 'multi_real_time_transaction'; + +ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY; + +CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC + USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE)); + +\c - - - :master_port +SET ROLE rls_user; +SET search_path = 'multi_real_time_transaction'; + +-- shouldn't see all rows because of RLS +SELECT COUNT(*) FROM test_table; + +BEGIN; +-- without enabling SET LOCAL prop, still won't work +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + +SET LOCAL citus.propagate_set_commands TO 'local'; + +-- now we should be good to go +SET LOCAL app.show_rows TO TRUE; +SELECT COUNT(*) FROM test_table; + +SAVEPOINT disable_rls; +SET LOCAL app.show_rows TO FALSE; +SELECT COUNT(*) FROM test_table; + +ROLLBACK TO SAVEPOINT disable_rls; +SELECT COUNT(*) FROM test_table; + +SAVEPOINT disable_rls_for_real; +SET LOCAL app.show_rows TO FALSE; +RELEASE SAVEPOINT disable_rls_for_real; + +SELECT COUNT(*) FROM test_table; +COMMIT; + +RESET ROLE; + -- sequential real-time queries should be successfully executed -- since the queries are sent over the same connection BEGIN;