Functions are treated as transaction blocks

pull/2587/head
Marco Slot 2019-01-11 14:43:07 +01:00 committed by Jason Petersen
parent 4b9bd54ae0
commit f2abf2b8e5
12 changed files with 154 additions and 17 deletions

View File

@ -1132,7 +1132,7 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
{ {
return false; return false;
} }
else if (!localTableEmpty || IsTransactionBlock()) else if (!localTableEmpty || IsMultiStatementTransaction())
{ {
return true; return true;
} }

View File

@ -1159,7 +1159,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
if (ColumnAppearsInForeignKeyToReferenceTable(affectedColumnName, if (ColumnAppearsInForeignKeyToReferenceTable(affectedColumnName,
relationId)) relationId))
{ {
if (IsTransactionBlock() && alterTableType == AT_AlterColumnType) if (alterTableType == AT_AlterColumnType)
{ {
SetLocalMultiShardModifyModeToSequential(); SetLocalMultiShardModifyModeToSequential();
} }

View File

@ -34,6 +34,7 @@
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "tcop/dest.h"
#include "tcop/pquery.h" #include "tcop/pquery.h"
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
@ -105,6 +106,19 @@ void
CitusExecutorRun(QueryDesc *queryDesc, CitusExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once) ScanDirection direction, uint64 count, bool execute_once)
{ {
DestReceiver *dest = queryDesc->dest;
int originalLevel = FunctionCallLevel;
if (dest->mydest == DestSPI)
{
/*
* If the query runs via SPI, we assume we're in a function call
* and we should treat statements as part of a bigger transaction.
* We reset this counter to 0 in the abort handler.
*/
FunctionCallLevel++;
}
/* /*
* Disable execution of ALTER TABLE constraint validation queries. These * Disable execution of ALTER TABLE constraint validation queries. These
* constraints will be validated in worker nodes, so running these queries * constraints will be validated in worker nodes, so running these queries
@ -122,7 +136,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
if (AlterTableConstraintCheck(queryDesc)) if (AlterTableConstraintCheck(queryDesc))
{ {
EState *estate = queryDesc->estate; EState *estate = queryDesc->estate;
DestReceiver *dest = queryDesc->dest;
estate->es_processed = 0; estate->es_processed = 0;
estate->es_lastoid = InvalidOid; estate->es_lastoid = InvalidOid;
@ -135,6 +148,16 @@ CitusExecutorRun(QueryDesc *queryDesc,
{ {
standard_ExecutorRun(queryDesc, direction, count, execute_once); standard_ExecutorRun(queryDesc, direction, count, execute_once);
} }
if (dest->mydest == DestSPI)
{
/*
* Restore the original value. It is not sufficient to decrease
* the value because exceptions might cause us to go back a few
* levels at once.
*/
FunctionCallLevel = originalLevel;
}
} }

View File

@ -104,7 +104,7 @@ MultiRealTimeExecute(Job *job)
workerNodeList = ActiveReadableNodeList(); workerNodeList = ActiveReadableNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList); workerHash = WorkerHash(workerHashName, workerNodeList);
if (IsTransactionBlock() && SelectOpensTransactionBlock) if (IsMultiStatementTransaction() && SelectOpensTransactionBlock)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
} }

View File

@ -631,8 +631,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
* customers already use functions that touch multiple shards from within * customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now. * a function, so we'll ignore functions for now.
*/ */
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC || if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
StoredProcedureLevel > 0)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
@ -1118,7 +1117,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
/* if some placements failed, ensure future statements don't access them */ /* if some placements failed, ensure future statements don't access them */
MarkFailedShardPlacements(); MarkFailedShardPlacements();
if (IsTransactionBlock()) if (IsMultiStatementTransaction())
{ {
XactModificationLevel = XACT_MODIFICATION_DATA; XactModificationLevel = XACT_MODIFICATION_DATA;
} }
@ -1287,7 +1286,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
{ {
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */ /* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
} }
else if (IsTransactionBlock() || multipleTasks) else if (IsMultiStatementTransaction() || multipleTasks)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();

View File

@ -572,6 +572,21 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.function_opens_transaction_block",
gettext_noop("Open transaction blocks for function calls"),
gettext_noop("When enabled, Citus will always send a BEGIN to workers when "
"running distributed queres in a function. When disabled, the "
"queries may be committed immediately after the statemnent "
"completes. Disabling this flag is dangerous, it is only provided "
"for backwards compatibility with pre-8.2 behaviour."),
&FunctionOpensTransactionBlock,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_deadlock_prevention", "citus.enable_deadlock_prevention",
gettext_noop("Avoids deadlocks by preventing concurrent multi-shard commands"), gettext_noop("Avoids deadlocks by preventing concurrent multi-shard commands"),

View File

@ -625,17 +625,17 @@ GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType)
* ShouldRecordRelationAccess returns true when we should keep track * ShouldRecordRelationAccess returns true when we should keep track
* of the relation accesses. * of the relation accesses.
* *
* In many cases, we'd only need IsTransactionBlock(), however, for some cases such as * In many cases, we'd only need IsMultiStatementTransaction(), however, for some
* CTEs, where Citus uses the same connections accross multiple queries, we should * cases such as CTEs, where Citus uses the same connections accross multiple queries,
* still record the relation accesses even not inside an explicit transaction block. * we should still record the relation accesses even not inside an explicit transaction
* Thus, keeping track of the relation accesses inside coordinated transactions is * block. Thus, keeping track of the relation accesses inside coordinated transactions
* also required. * is also required.
*/ */
bool bool
ShouldRecordRelationAccess() ShouldRecordRelationAccess()
{ {
if (EnforceForeignKeyRestrictions && if (EnforceForeignKeyRestrictions &&
(IsTransactionBlock() || InCoordinatedTransaction())) (IsMultiStatementTransaction() || InCoordinatedTransaction()))
{ {
return true; return true;
} }

View File

@ -59,6 +59,13 @@ MemoryContext CommitContext = NULL;
*/ */
bool CoordinatedTransactionUses2PC = false; bool CoordinatedTransactionUses2PC = false;
/* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true;
/* stack depth of UDF calls */
int FunctionCallLevel = 0;
/* transaction management functions */ /* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedTransactionCallback(XactEvent event, void *arg);
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
@ -258,6 +265,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions); dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false; CoordinatedTransactionUses2PC = false;
FunctionCallLevel = 0;
/* /*
* We should reset SubPlanLevel in case a transaction is aborted, * We should reset SubPlanLevel in case a transaction is aborted,
@ -545,3 +553,34 @@ SwallowErrors(void (*func)())
} }
PG_END_TRY(); PG_END_TRY();
} }
/*
* IsMultiStatementTransaction determines whether the current statement is
* part of a bigger multi-statement transaction. This is the case when the
* statement is wrapped in a transaction block (comes after BEGIN), or it
* is called from a stored procedure or function.
*/
bool
IsMultiStatementTransaction(void)
{
if (IsTransactionBlock())
{
/* in a BEGIN...END block */
return true;
}
else if (StoredProcedureLevel > 0)
{
/* in (a transaction within) a stored procedure */
return true;
}
else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock)
{
/* in a language-handler function call, open a transaction if configured to do so */
return true;
}
else
{
return false;
}
}

View File

@ -36,9 +36,6 @@ typedef struct XactShardConnSet
extern bool AllModificationsCommutative; extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention; extern bool EnableDeadlockPrevention;
/* number of nested stored procedure call levels we are currently in */
extern int StoredProcedureLevel;
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);

View File

@ -59,6 +59,12 @@ typedef enum
*/ */
extern bool SelectOpensTransactionBlock; extern bool SelectOpensTransactionBlock;
/*
* GUC that determines whether a function should be considered a transaction
* block.
*/
extern bool FunctionOpensTransactionBlock;
/* config variable managed via guc.c */ /* config variable managed via guc.c */
extern int MultiShardCommitProtocol; extern int MultiShardCommitProtocol;
@ -73,6 +79,13 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
/* list of connections that are part of the current coordinated transaction */ /* list of connections that are part of the current coordinated transaction */
extern dlist_head InProgressTransactions; extern dlist_head InProgressTransactions;
/* number of nested stored procedure call levels we are currently in */
extern int StoredProcedureLevel;
/* number of nested function call levels we are currently in */
extern int FunctionCallLevel;
/* /*
* Coordinated transaction management. * Coordinated transaction management.
*/ */
@ -80,6 +93,7 @@ extern void BeginCoordinatedTransaction(void);
extern void BeginOrContinueCoordinatedTransaction(void); extern void BeginOrContinueCoordinatedTransaction(void);
extern bool InCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void);
extern void CoordinatedTransactionUse2PC(void); extern void CoordinatedTransactionUse2PC(void);
extern bool IsMultiStatementTransaction(void);
/* initialization function(s) */ /* initialization function(s) */
extern void InitializeTransactionManagement(void); extern void InitializeTransactionManagement(void);

View File

@ -1577,4 +1577,33 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
(1 row) (1 row)
END; END;
-- make sure functions that throw an error roll back propertly
CREATE FUNCTION insert_abort()
RETURNS bool
AS $BODY$
BEGIN
INSERT INTO labs VALUES (1001, 'Abort Labs');
UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001;
RAISE 'do not insert';
END;
$BODY$ LANGUAGE plpgsql;
SELECT insert_abort();
ERROR: do not insert
SELECT name FROM labs WHERE id = 1001;
name
------
(0 rows)
-- if function_opens_transaction-block is disabled the insert commits immediately
SET citus.function_opens_transaction_block TO off;
SELECT insert_abort();
ERROR: do not insert
SELECT name FROM labs WHERE id = 1001;
name
---------------
Rollback Labs
(1 row)
RESET citus.function_opens_transaction_block;
DROP FUNCTION insert_abort();
DROP TABLE items, users, itemgroups, usergroups, researchers, labs; DROP TABLE items, users, itemgroups, usergroups, researchers, labs;

View File

@ -1157,4 +1157,25 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
END; END;
-- make sure functions that throw an error roll back propertly
CREATE FUNCTION insert_abort()
RETURNS bool
AS $BODY$
BEGIN
INSERT INTO labs VALUES (1001, 'Abort Labs');
UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001;
RAISE 'do not insert';
END;
$BODY$ LANGUAGE plpgsql;
SELECT insert_abort();
SELECT name FROM labs WHERE id = 1001;
-- if function_opens_transaction-block is disabled the insert commits immediately
SET citus.function_opens_transaction_block TO off;
SELECT insert_abort();
SELECT name FROM labs WHERE id = 1001;
RESET citus.function_opens_transaction_block;
DROP FUNCTION insert_abort();
DROP TABLE items, users, itemgroups, usergroups, researchers, labs; DROP TABLE items, users, itemgroups, usergroups, researchers, labs;