mirror of https://github.com/citusdata/citus.git
Merge pull request #2689 from citusdata/prop_set_local
Add logic to propagate SET LOCAL at xact start cr: @marcocituspull/2786/head
commit
70055098af
|
@ -56,6 +56,7 @@
|
||||||
|
|
||||||
|
|
||||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||||
|
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
|
||||||
static bool shouldInvalidateForeignKeyGraph = false;
|
static bool shouldInvalidateForeignKeyGraph = false;
|
||||||
static int activeAlterTables = 0;
|
static int activeAlterTables = 0;
|
||||||
|
|
||||||
|
@ -176,6 +177,24 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */
|
||||||
|
if (IsA(parsetree, VariableSetStmt))
|
||||||
|
{
|
||||||
|
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
|
||||||
|
bool propagateSetVar = (PropagateSetCommands == PROPSETCMD_LOCAL &&
|
||||||
|
setStmt->is_local);
|
||||||
|
bool setVarIsValid = SetCommandTargetIsValid(setStmt);
|
||||||
|
|
||||||
|
/* at present, we only implement the NONE and LOCAL behaviors */
|
||||||
|
AssertState(PropagateSetCommands == PROPSETCMD_NONE ||
|
||||||
|
PropagateSetCommands == PROPSETCMD_LOCAL);
|
||||||
|
|
||||||
|
if (propagateSetVar && setVarIsValid && IsMultiStatementTransaction())
|
||||||
|
{
|
||||||
|
ProcessVariableSetStmt(setStmt, queryString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
||||||
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* variableset.c
|
||||||
|
* Support for propagation of SET (commands to set variables)
|
||||||
|
*
|
||||||
|
* Copyright (c) 2019, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "c.h"
|
||||||
|
|
||||||
|
#include "common/string.h"
|
||||||
|
#include "distributed/commands.h"
|
||||||
|
#include "distributed/commands/utility_hook.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/version_compat.h"
|
||||||
|
#include "storage/lmgr.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
#include "lib/ilist.h"
|
||||||
|
#include "utils/varlena.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Checks whether a SET command modifies a variable which might violate assumptions
|
||||||
|
* made by Citus. Because we generally don't want a connection to be destroyed on error,
|
||||||
|
* and because we eagerly ensure the stack can be fully allocated (at backend startup),
|
||||||
|
* permitting changes to these two variables seems unwise. Also, ban propagating the
|
||||||
|
* SET command propagation setting (not for correctness, more to avoid confusion).
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
SetCommandTargetIsValid(VariableSetStmt *setStmt)
|
||||||
|
{
|
||||||
|
/* if this list grows considerably, switch to bsearch */
|
||||||
|
const char *blacklist[] = {
|
||||||
|
"citus.propagate_set_commands",
|
||||||
|
"client_encoding",
|
||||||
|
"exit_on_error",
|
||||||
|
"max_stack_depth"
|
||||||
|
};
|
||||||
|
Index idx = 0;
|
||||||
|
|
||||||
|
for (idx = 0; idx < lengthof(blacklist); idx++)
|
||||||
|
{
|
||||||
|
if (pg_strcasecmp(blacklist[idx], setStmt->name))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ProcessVariableSetStmt actually does the work of propagating a provided SET stmt
|
||||||
|
* to currently-participating worker nodes and adding the SET command test to a string
|
||||||
|
* keeping track of all propagated SET commands since (sub-)xact start.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
const bool raiseInterrupts = true;
|
||||||
|
List *connectionList = NIL;
|
||||||
|
|
||||||
|
/* at present we only support SET LOCAL */
|
||||||
|
AssertArg(setStmt->is_local);
|
||||||
|
|
||||||
|
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */
|
||||||
|
if (activeSetStmts == NULL)
|
||||||
|
{
|
||||||
|
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
|
||||||
|
activeSetStmts = makeStringInfo();
|
||||||
|
MemoryContextSwitchTo(old_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* send text of SET stmt to participating nodes... */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = NULL;
|
||||||
|
|
||||||
|
transaction = &connection->remoteTransaction;
|
||||||
|
if (transaction->transactionFailed)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, setStmtString))
|
||||||
|
{
|
||||||
|
const bool raiseErrors = true;
|
||||||
|
HandleRemoteTransactionConnectionError(connection, raiseErrors);
|
||||||
|
}
|
||||||
|
|
||||||
|
connectionList = lappend(connectionList, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitForAllConnections(connectionList, raiseInterrupts);
|
||||||
|
|
||||||
|
/* ... and wait for the results */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = NULL;
|
||||||
|
const bool raiseErrors = true;
|
||||||
|
|
||||||
|
transaction = &connection->remoteTransaction;
|
||||||
|
if (transaction->transactionFailed)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClearResults(connection, raiseErrors);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* SET propagation successful: add to active SET stmt string */
|
||||||
|
appendStringInfoString(activeSetStmts, setStmtString);
|
||||||
|
|
||||||
|
/* ensure semicolon on end to allow appending future SET stmts */
|
||||||
|
if (!pg_str_endswith(setStmtString, ";"))
|
||||||
|
{
|
||||||
|
appendStringInfoChar(activeSetStmts, ';');
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,15 +29,9 @@
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "catalog/pg_constraint.h"
|
#include "catalog/pg_constraint.h"
|
||||||
|
#if (PG_VERSION_NUM < 110000)
|
||||||
#if PG_VERSION_NUM < 110000
|
|
||||||
|
|
||||||
/* pg_constraint_fn.h is gone in postgres 11,
|
|
||||||
* get_relation_constraint_oid is merged into pg_constraint.h then
|
|
||||||
*/
|
|
||||||
#include "catalog/pg_constraint_fn.h"
|
#include "catalog/pg_constraint_fn.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/commands.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
@ -59,6 +60,7 @@
|
||||||
#include "tcop/tcopprot.h"
|
#include "tcop/tcopprot.h"
|
||||||
#include "utils/guc.h"
|
#include "utils/guc.h"
|
||||||
#include "utils/guc_tables.h"
|
#include "utils/guc_tables.h"
|
||||||
|
#include "utils/varlena.h"
|
||||||
|
|
||||||
/* marks shared object as one loadable by the postgres version compiled against */
|
/* marks shared object as one loadable by the postgres version compiled against */
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
@ -86,6 +88,13 @@ static int CitusSSLMode = 0;
|
||||||
|
|
||||||
/* *INDENT-OFF* */
|
/* *INDENT-OFF* */
|
||||||
/* GUC enum definitions */
|
/* GUC enum definitions */
|
||||||
|
static const struct config_enum_entry propagate_set_commands_options[] = {
|
||||||
|
{"none", PROPSETCMD_NONE, false},
|
||||||
|
{"local", PROPSETCMD_LOCAL, false},
|
||||||
|
{NULL, 0, false}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
static const struct config_enum_entry task_assignment_policy_options[] = {
|
static const struct config_enum_entry task_assignment_policy_options[] = {
|
||||||
{ "greedy", TASK_ASSIGNMENT_GREEDY, false },
|
{ "greedy", TASK_ASSIGNMENT_GREEDY, false },
|
||||||
{ "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false },
|
{ "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false },
|
||||||
|
@ -630,6 +639,17 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomEnumVariable(
|
||||||
|
"citus.propagate_set_commands",
|
||||||
|
gettext_noop("Sets which SET commands are propagated to workers."),
|
||||||
|
NULL,
|
||||||
|
&PropagateSetCommands,
|
||||||
|
PROPSETCMD_NONE,
|
||||||
|
propagate_set_commands_options,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_router_execution",
|
"citus.enable_router_execution",
|
||||||
gettext_noop("Enables router execution"),
|
gettext_noop("Enables router execution"),
|
||||||
|
|
|
@ -93,16 +93,31 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
distributedTransactionId->transactionNumber,
|
distributedTransactionId->transactionNumber,
|
||||||
timestamp);
|
timestamp);
|
||||||
|
|
||||||
/* append in-progress savepoints for this transaction */
|
/* append context for in-progress SAVEPOINTs for this transaction */
|
||||||
activeSubXacts = ActiveSubXacts();
|
activeSubXacts = ActiveSubXactContexts();
|
||||||
transaction->lastSuccessfulSubXact = TopSubTransactionId;
|
transaction->lastSuccessfulSubXact = TopSubTransactionId;
|
||||||
transaction->lastQueuedSubXact = TopSubTransactionId;
|
transaction->lastQueuedSubXact = TopSubTransactionId;
|
||||||
foreach(subIdCell, activeSubXacts)
|
foreach(subIdCell, activeSubXacts)
|
||||||
{
|
{
|
||||||
SubTransactionId subId = lfirst_int(subIdCell);
|
SubXactContext *subXactState = lfirst(subIdCell);
|
||||||
|
|
||||||
|
/* append SET LOCAL state from when SAVEPOINT was encountered... */
|
||||||
|
if (subXactState->setLocalCmds != NULL)
|
||||||
|
{
|
||||||
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
|
subXactState->setLocalCmds->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ... then append SAVEPOINT to enter this subxact */
|
||||||
appendStringInfo(beginAndSetDistributedTransactionId,
|
appendStringInfo(beginAndSetDistributedTransactionId,
|
||||||
"SAVEPOINT savepoint_%u;", subId);
|
"SAVEPOINT savepoint_%u;", subXactState->subId);
|
||||||
transaction->lastQueuedSubXact = subId;
|
transaction->lastQueuedSubXact = subXactState->subId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we've pushed into deepest subxact: apply in-progress SET context */
|
||||||
|
if (activeSetStmts != NULL)
|
||||||
|
{
|
||||||
|
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
||||||
|
|
|
@ -46,8 +46,20 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
/* list of connections that are part of the current coordinated transaction */
|
/* list of connections that are part of the current coordinated transaction */
|
||||||
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
|
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
|
||||||
|
|
||||||
/* stack of active sub-transactions */
|
/*
|
||||||
static List *activeSubXacts = NIL;
|
* activeSetStmts keeps track of SET LOCAL statements executed within the current
|
||||||
|
* subxact and will be set to NULL when pushing into new subxact or ending top xact.
|
||||||
|
*/
|
||||||
|
StringInfo activeSetStmts;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Though a list, we treat this as a stack, pushing on subxact contexts whenever
|
||||||
|
* e.g. a SAVEPOINT is executed (though this is actually performed by providing
|
||||||
|
* PostgreSQL with a sub-xact callback). At present, the context of a subxact
|
||||||
|
* includes a subxact identifier as well as any SET LOCAL statements propagated
|
||||||
|
* to workers during the sub-transaction.
|
||||||
|
*/
|
||||||
|
static List *activeSubXactContexts = NIL;
|
||||||
|
|
||||||
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
|
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
|
||||||
MemoryContext CommitContext = NULL;
|
MemoryContext CommitContext = NULL;
|
||||||
|
@ -67,6 +79,7 @@ int FunctionCallLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
/* transaction management functions */
|
/* transaction management functions */
|
||||||
|
static void BeginCoordinatedTransaction(void);
|
||||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||||
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||||
SubTransactionId parentSubid, void *arg);
|
SubTransactionId parentSubid, void *arg);
|
||||||
|
@ -78,25 +91,6 @@ static void PopSubXact(SubTransactionId subId);
|
||||||
static void SwallowErrors(void (*func)());
|
static void SwallowErrors(void (*func)());
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* BeginCoordinatedTransaction begins a coordinated transaction. No
|
|
||||||
* pre-existing coordinated transaction may be in progress.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
BeginCoordinatedTransaction(void)
|
|
||||||
{
|
|
||||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
|
|
||||||
CurrentCoordinatedTransactionState != COORD_TRANS_IDLE)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("starting transaction in wrong state")));
|
|
||||||
}
|
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
|
||||||
|
|
||||||
AssignDistributedTransactionId();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* BeginOrContinueCoordinatedTransaction starts a coordinated transaction,
|
* BeginOrContinueCoordinatedTransaction starts a coordinated transaction,
|
||||||
* unless one already is in progress.
|
* unless one already is in progress.
|
||||||
|
@ -156,6 +150,25 @@ InitializeTransactionManagement(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BeginCoordinatedTransaction begins a coordinated transaction. No
|
||||||
|
* pre-existing coordinated transaction may be in progress./
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BeginCoordinatedTransaction(void)
|
||||||
|
{
|
||||||
|
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
|
||||||
|
CurrentCoordinatedTransactionState != COORD_TRANS_IDLE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("starting transaction in wrong state")));
|
||||||
|
}
|
||||||
|
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||||
|
|
||||||
|
AssignDistributedTransactionId();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Transaction management callback, handling coordinated transaction, and
|
* Transaction management callback, handling coordinated transaction, and
|
||||||
* transaction independent connection management.
|
* transaction independent connection management.
|
||||||
|
@ -212,6 +225,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
|
activeSetStmts = NULL;
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
|
||||||
UnSetDistributedTransactionId();
|
UnSetDistributedTransactionId();
|
||||||
|
@ -264,6 +278,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
|
activeSetStmts = NULL;
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
FunctionCallLevel = 0;
|
FunctionCallLevel = 0;
|
||||||
|
|
||||||
|
@ -483,7 +498,16 @@ static void
|
||||||
PushSubXact(SubTransactionId subId)
|
PushSubXact(SubTransactionId subId)
|
||||||
{
|
{
|
||||||
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
|
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
|
||||||
activeSubXacts = lcons_int(subId, activeSubXacts);
|
|
||||||
|
/* save provided subId as well as propagated SET LOCAL stmts */
|
||||||
|
SubXactContext *state = palloc(sizeof(SubXactContext));
|
||||||
|
state->subId = subId;
|
||||||
|
state->setLocalCmds = activeSetStmts;
|
||||||
|
|
||||||
|
/* append to list and reset active set stmts for upcoming sub-xact */
|
||||||
|
activeSubXactContexts = lcons(state, activeSubXactContexts);
|
||||||
|
activeSetStmts = makeStringInfo();
|
||||||
|
|
||||||
MemoryContextSwitchTo(old_context);
|
MemoryContextSwitchTo(old_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,8 +517,17 @@ static void
|
||||||
PopSubXact(SubTransactionId subId)
|
PopSubXact(SubTransactionId subId)
|
||||||
{
|
{
|
||||||
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
|
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
|
||||||
Assert(linitial_int(activeSubXacts) == subId);
|
SubXactContext *state = linitial(activeSubXactContexts);
|
||||||
activeSubXacts = list_delete_first(activeSubXacts);
|
|
||||||
|
/*
|
||||||
|
* the previous activeSetStmts is already invalid because it's in the now-
|
||||||
|
* aborted subxact (what we're popping), so no need to free before assign-
|
||||||
|
* ing with the setLocalCmds of the popped context
|
||||||
|
*/
|
||||||
|
Assert(state->subId == subId);
|
||||||
|
activeSetStmts = state->setLocalCmds;
|
||||||
|
activeSubXactContexts = list_delete_first(activeSubXactContexts);
|
||||||
|
|
||||||
MemoryContextSwitchTo(old_context);
|
MemoryContextSwitchTo(old_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,23 +536,44 @@ PopSubXact(SubTransactionId subId)
|
||||||
List *
|
List *
|
||||||
ActiveSubXacts(void)
|
ActiveSubXacts(void)
|
||||||
{
|
{
|
||||||
ListCell *subIdCell = NULL;
|
ListCell *subXactCell = NULL;
|
||||||
List *activeSubXactsReversed = NIL;
|
List *activeSubXactsReversed = NIL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* activeSubXacts is in reversed temporal order, so we reverse it to get it
|
* activeSubXactContexts is in reversed temporal order, so we reverse it to get it
|
||||||
* in temporal order.
|
* in temporal order.
|
||||||
*/
|
*/
|
||||||
foreach(subIdCell, activeSubXacts)
|
foreach(subXactCell, activeSubXactContexts)
|
||||||
{
|
{
|
||||||
SubTransactionId subId = lfirst_int(subIdCell);
|
SubXactContext *state = lfirst(subXactCell);
|
||||||
activeSubXactsReversed = lcons_int(subId, activeSubXactsReversed);
|
activeSubXactsReversed = lcons_int(state->subId, activeSubXactsReversed);
|
||||||
}
|
}
|
||||||
|
|
||||||
return activeSubXactsReversed;
|
return activeSubXactsReversed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ActiveSubXactContexts returns the list of active sub-xact context in temporal order. */
|
||||||
|
List *
|
||||||
|
ActiveSubXactContexts(void)
|
||||||
|
{
|
||||||
|
ListCell *subXactCell = NULL;
|
||||||
|
List *reversedSubXactStates = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* activeSubXactContexts is in reversed temporal order, so we reverse it to get it
|
||||||
|
* in temporal order.
|
||||||
|
*/
|
||||||
|
foreach(subXactCell, activeSubXactContexts)
|
||||||
|
{
|
||||||
|
SubXactContext *state = lfirst(subXactCell);
|
||||||
|
reversedSubXactStates = lcons(state, reversedSubXactStates);
|
||||||
|
}
|
||||||
|
|
||||||
|
return reversedSubXactStates;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If an ERROR is thrown while processing a transaction the ABORT handler is called.
|
* If an ERROR is thrown while processing a transaction the ABORT handler is called.
|
||||||
* ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also
|
* ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also
|
||||||
|
|
|
@ -123,5 +123,7 @@ extern void ProcessTruncateStatement(TruncateStmt *truncateStatement);
|
||||||
/* vacuum.c - froward declarations */
|
/* vacuum.c - froward declarations */
|
||||||
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||||
|
|
||||||
|
extern bool SetCommandTargetIsValid(VariableSetStmt *setStmt);
|
||||||
|
extern void ProcessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);
|
||||||
|
|
||||||
#endif /*CITUS_COMMANDS_H */
|
#endif /*CITUS_COMMANDS_H */
|
||||||
|
|
|
@ -16,9 +16,17 @@
|
||||||
#include "utils/relcache.h"
|
#include "utils/relcache.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
PROPSETCMD_INVALID = -1,
|
||||||
|
PROPSETCMD_NONE, /* do not propagate SET commands */
|
||||||
|
PROPSETCMD_LOCAL, /* propagate SET LOCAL commands */
|
||||||
|
PROPSETCMD_SESSION, /* propagate SET commands, but not SET LOCAL ones */
|
||||||
|
PROPSETCMD_ALL /* propagate all SET commands */
|
||||||
|
} PropSetCmdBehavior;
|
||||||
|
extern PropSetCmdBehavior PropagateSetCommands;
|
||||||
extern bool EnableDDLPropagation;
|
extern bool EnableDDLPropagation;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A DDLJob encapsulates the remote tasks and commands needed to process all or
|
* A DDLJob encapsulates the remote tasks and commands needed to process all or
|
||||||
* part of a distributed DDL command. It hold the distributed relation's oid,
|
* part of a distributed DDL command. It hold the distributed relation's oid,
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
#define TRANSACTION_MANAGMENT_H
|
#define TRANSACTION_MANAGMENT_H
|
||||||
|
|
||||||
#include "lib/ilist.h"
|
#include "lib/ilist.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
/* describes what kind of modifications have occurred in the current transaction */
|
/* describes what kind of modifications have occurred in the current transaction */
|
||||||
|
@ -53,6 +54,13 @@ typedef enum
|
||||||
COMMIT_PROTOCOL_2PC = 2
|
COMMIT_PROTOCOL_2PC = 2
|
||||||
} CommitProtocolType;
|
} CommitProtocolType;
|
||||||
|
|
||||||
|
/* Enumeration to keep track of context within nested sub-transactions */
|
||||||
|
typedef struct SubXactContext
|
||||||
|
{
|
||||||
|
SubTransactionId subId;
|
||||||
|
StringInfo setLocalCmds;
|
||||||
|
} SubXactContext;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GUC that determines whether a SELECT in a transaction block should also run in
|
* GUC that determines whether a SELECT in a transaction block should also run in
|
||||||
* a transaction block on the worker.
|
* a transaction block on the worker.
|
||||||
|
@ -85,11 +93,12 @@ extern int StoredProcedureLevel;
|
||||||
/* number of nested function call levels we are currently in */
|
/* number of nested function call levels we are currently in */
|
||||||
extern int FunctionCallLevel;
|
extern int FunctionCallLevel;
|
||||||
|
|
||||||
|
/* SET LOCAL statements active in the current (sub-)transaction. */
|
||||||
|
extern StringInfo activeSetStmts;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Coordinated transaction management.
|
* Coordinated transaction management.
|
||||||
*/
|
*/
|
||||||
extern void BeginCoordinatedTransaction(void);
|
|
||||||
extern void BeginOrContinueCoordinatedTransaction(void);
|
extern void BeginOrContinueCoordinatedTransaction(void);
|
||||||
extern bool InCoordinatedTransaction(void);
|
extern bool InCoordinatedTransaction(void);
|
||||||
extern void CoordinatedTransactionUse2PC(void);
|
extern void CoordinatedTransactionUse2PC(void);
|
||||||
|
@ -100,6 +109,7 @@ extern void InitializeTransactionManagement(void);
|
||||||
|
|
||||||
/* other functions */
|
/* other functions */
|
||||||
extern List * ActiveSubXacts(void);
|
extern List * ActiveSubXacts(void);
|
||||||
|
extern List * ActiveSubXactContexts(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* TRANSACTION_MANAGMENT_H */
|
#endif /* TRANSACTION_MANAGMENT_H */
|
||||||
|
|
|
@ -341,6 +341,92 @@ BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- test propagation of SET LOCAL
|
||||||
|
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
|
||||||
|
CREATE USER rls_user;
|
||||||
|
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
SELECT run_command_on_workers('CREATE USER rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
-----------------------------------
|
||||||
|
(localhost,57637,t,"CREATE ROLE")
|
||||||
|
(localhost,57638,t,"CREATE ROLE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------
|
||||||
|
(localhost,57637,t,GRANT)
|
||||||
|
(localhost,57638,t,GRANT)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------
|
||||||
|
(localhost,57637,t,GRANT)
|
||||||
|
(localhost,57638,t,GRANT)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- create trigger on one worker to reject access if GUC not
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
|
||||||
|
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
|
||||||
|
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
|
||||||
|
\c - - - :master_port
|
||||||
|
SET ROLE rls_user;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
-- shouldn't see all rows because of RLS
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- without enabling SET LOCAL prop, still won't work
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET LOCAL citus.propagate_set_commands TO 'local';
|
||||||
|
-- now we should be good to go
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT disable_rls;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls_for_real;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
RELEASE SAVEPOINT disable_rls_for_real;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
-- sequential real-time queries should be successfully executed
|
-- sequential real-time queries should be successfully executed
|
||||||
-- since the queries are sent over the same connection
|
-- since the queries are sent over the same connection
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -349,6 +349,92 @@ BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- test propagation of SET LOCAL
|
||||||
|
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
|
||||||
|
CREATE USER rls_user;
|
||||||
|
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
SELECT run_command_on_workers('CREATE USER rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
-----------------------------------
|
||||||
|
(localhost,57637,t,"CREATE ROLE")
|
||||||
|
(localhost,57638,t,"CREATE ROLE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------
|
||||||
|
(localhost,57637,t,GRANT)
|
||||||
|
(localhost,57638,t,GRANT)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------
|
||||||
|
(localhost,57637,t,GRANT)
|
||||||
|
(localhost,57638,t,GRANT)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- create trigger on one worker to reject access if GUC not
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
|
||||||
|
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
|
||||||
|
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
|
||||||
|
\c - - - :master_port
|
||||||
|
SET ROLE rls_user;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
-- shouldn't see all rows because of RLS
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- without enabling SET LOCAL prop, still won't work
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET LOCAL citus.propagate_set_commands TO 'local';
|
||||||
|
-- now we should be good to go
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT disable_rls;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls_for_real;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
RELEASE SAVEPOINT disable_rls_for_real;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
-- sequential real-time queries should be successfully executed
|
-- sequential real-time queries should be successfully executed
|
||||||
-- since the queries are sent over the same connection
|
-- since the queries are sent over the same connection
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -213,6 +213,59 @@ BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- test propagation of SET LOCAL
|
||||||
|
-- gonna need a non-superuser as we'll use RLS to test GUC propagation
|
||||||
|
CREATE USER rls_user;
|
||||||
|
GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user;
|
||||||
|
|
||||||
|
SELECT run_command_on_workers('CREATE USER rls_user');
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
SELECT run_command_on_workers('GRANT ALL ON ALL TABLES IN SCHEMA multi_real_time_transaction TO rls_user');
|
||||||
|
|
||||||
|
-- create trigger on one worker to reject access if GUC not
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
|
||||||
|
ALTER TABLE test_table_1610000 ENABLE ROW LEVEL SECURITY;
|
||||||
|
|
||||||
|
CREATE POLICY hide_by_default ON test_table_1610000 TO PUBLIC
|
||||||
|
USING (COALESCE(current_setting('app.show_rows', TRUE)::bool, FALSE));
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET ROLE rls_user;
|
||||||
|
SET search_path = 'multi_real_time_transaction';
|
||||||
|
|
||||||
|
-- shouldn't see all rows because of RLS
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- without enabling SET LOCAL prop, still won't work
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
||||||
|
SET LOCAL citus.propagate_set_commands TO 'local';
|
||||||
|
|
||||||
|
-- now we should be good to go
|
||||||
|
SET LOCAL app.show_rows TO TRUE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT disable_rls;
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
|
||||||
|
SAVEPOINT disable_rls_for_real;
|
||||||
|
SET LOCAL app.show_rows TO FALSE;
|
||||||
|
RELEASE SAVEPOINT disable_rls_for_real;
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM test_table;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
-- sequential real-time queries should be successfully executed
|
-- sequential real-time queries should be successfully executed
|
||||||
-- since the queries are sent over the same connection
|
-- since the queries are sent over the same connection
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
Loading…
Reference in New Issue