diff --git a/src/backend/distributed/citus--5.0.sql b/src/backend/distributed/citus--5.0.sql index 316769e62..183982820 100644 --- a/src/backend/distributed/citus--5.0.sql +++ b/src/backend/distributed/citus--5.0.sql @@ -376,87 +376,6 @@ CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); COMMENT ON AGGREGATE array_cat_agg(anyarray) IS 'concatenate input arrays into a single array'; - -/* - * Creates a temporary table exactly like the specified target table along with - * a trigger to redirect any INSERTed rows from the proxy to the underlying - * table. Users may optionally provide a sequence which will be incremented - * after each row that has been successfully proxied (useful for counting rows - * processed). Returns the name of the proxy table that was created. - */ -CREATE FUNCTION create_insert_proxy_for_table(target_table regclass, - sequence regclass DEFAULT NULL) -RETURNS text -AS $create_insert_proxy_for_table$ - DECLARE - temp_table_name text; - attr_names text[]; - attr_list text; - param_list text; - using_list text; - insert_command text; - -- templates to create dynamic functions, tables, and triggers - func_tmpl CONSTANT text := $$CREATE FUNCTION pg_temp.copy_to_insert() - RETURNS trigger - AS $copy_to_insert$ - BEGIN - EXECUTE %L USING %s; - PERFORM nextval(%L); - RETURN NULL; - END; - $copy_to_insert$ LANGUAGE plpgsql;$$; - table_tmpl CONSTANT text := $$CREATE TEMPORARY TABLE %I - (LIKE %s INCLUDING DEFAULTS)$$; - trigger_tmpl CONSTANT text := $$CREATE TRIGGER copy_to_insert - BEFORE INSERT ON %s FOR EACH ROW - EXECUTE PROCEDURE pg_temp.copy_to_insert()$$; - BEGIN - -- create name of temporary table using unqualified input table name - SELECT format('%s_insert_proxy', relname) - INTO STRICT temp_table_name - FROM pg_class - WHERE oid = target_table; - - -- get list of all attributes in table, we'll need shortly - SELECT array_agg(attname) - INTO STRICT attr_names - FROM pg_attribute - WHERE attrelid = target_table AND - attnum > 0 AND - NOT attisdropped; - - -- build fully specified column list and USING clause from attr. names - SELECT string_agg(quote_ident(attr_name), ','), - string_agg(format('NEW.%I', attr_name), ',') - INTO STRICT attr_list, - using_list - FROM unnest(attr_names) AS attr_name; - - -- build ($1, $2, $3)-style VALUE list to bind parameters - SELECT string_agg('$' || param_num, ',') - INTO STRICT param_list - FROM generate_series(1, array_length(attr_names, 1)) AS param_num; - - -- use the above lists to generate appropriate INSERT command - insert_command = format('INSERT INTO %s (%s) VALUES (%s)', target_table, - attr_list, param_list); - - -- use the command to make one-off trigger targeting specified table - EXECUTE format(func_tmpl, insert_command, using_list, sequence); - - -- create a temporary table exactly like the target table... - EXECUTE format(table_tmpl, temp_table_name, target_table); - - -- ... and install the trigger on that temporary table - EXECUTE format(trigger_tmpl, quote_ident(temp_table_name)::regclass); - - RETURN temp_table_name; - END; -$create_insert_proxy_for_table$ LANGUAGE plpgsql SET search_path = 'pg_catalog'; - -COMMENT ON FUNCTION create_insert_proxy_for_table(regclass, regclass) - IS 'create a proxy table that redirects INSERTed rows to a target table'; - -- define shard repair function CREATE FUNCTION master_copy_shard_placement(shard_id bigint, source_node_name text, diff --git a/src/backend/distributed/citus--8.0-10--8.0-11.sql b/src/backend/distributed/citus--8.0-10--8.0-11.sql index d96d45b46..cfba5461e 100644 --- a/src/backend/distributed/citus--8.0-10--8.0-11.sql +++ b/src/backend/distributed/citus--8.0-10--8.0-11.sql @@ -7,7 +7,6 @@ DROP FUNCTION IF EXISTS worker_foreign_file_path(text); DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]); DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer); DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text); -REVOKE ALL ON FUNCTION create_insert_proxy_for_table(regclass,regclass) FROM PUBLIC; -- Testing functions REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC; diff --git a/src/backend/distributed/citus--8.2-1--8.2-2.sql b/src/backend/distributed/citus--8.2-1--8.2-2.sql new file mode 100644 index 000000000..5b36b1739 --- /dev/null +++ b/src/backend/distributed/citus--8.2-1--8.2-2.sql @@ -0,0 +1,3 @@ +/* citus--8.2-1--8.2-2.sql */ + +DROP FUNCTION IF EXISTS pg_catalog.create_insert_proxy_for_table(regclass,regclass); diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 0e1d8e305..643009f15 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.2-1' +default_version = '8.2-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3c5a779b7..03af30755 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1132,7 +1132,7 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty) { return false; } - else if (!localTableEmpty || IsTransactionBlock()) + else if (!localTableEmpty || IsMultiStatementTransaction()) { return true; } diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index caa82c81a..68f0b8f1f 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1159,7 +1159,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) if (ColumnAppearsInForeignKeyToReferenceTable(affectedColumnName, relationId)) { - if (IsTransactionBlock() && alterTableType == AT_AlterColumnType) + if (alterTableType == AT_AlterColumnType) { SetLocalMultiShardModifyModeToSequential(); } diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 0b640fe86..110c7a165 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "parser/parsetree.h" #include "storage/lmgr.h" +#include "tcop/dest.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/snapmgr.h" @@ -105,6 +106,19 @@ void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { + DestReceiver *dest = queryDesc->dest; + int originalLevel = FunctionCallLevel; + + if (dest->mydest == DestSPI) + { + /* + * If the query runs via SPI, we assume we're in a function call + * and we should treat statements as part of a bigger transaction. + * We reset this counter to 0 in the abort handler. + */ + FunctionCallLevel++; + } + /* * Disable execution of ALTER TABLE constraint validation queries. These * constraints will be validated in worker nodes, so running these queries @@ -122,7 +136,6 @@ CitusExecutorRun(QueryDesc *queryDesc, if (AlterTableConstraintCheck(queryDesc)) { EState *estate = queryDesc->estate; - DestReceiver *dest = queryDesc->dest; estate->es_processed = 0; estate->es_lastoid = InvalidOid; @@ -135,6 +148,16 @@ CitusExecutorRun(QueryDesc *queryDesc, { standard_ExecutorRun(queryDesc, direction, count, execute_once); } + + if (dest->mydest == DestSPI) + { + /* + * Restore the original value. It is not sufficient to decrease + * the value because exceptions might cause us to go back a few + * levels at once. + */ + FunctionCallLevel = originalLevel; + } } diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 21f9fd276..e7126a7c6 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -104,7 +104,7 @@ MultiRealTimeExecute(Job *job) workerNodeList = ActiveReadableNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); - if (IsTransactionBlock() && SelectOpensTransactionBlock) + if (IsMultiStatementTransaction() && SelectOpensTransactionBlock) { BeginOrContinueCoordinatedTransaction(); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9118e1f8c..53b437e19 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -631,8 +631,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) * customers already use functions that touch multiple shards from within * a function, so we'll ignore functions for now. */ - if (IsTransactionBlock() || multipleTasks || taskListRequires2PC || - StoredProcedureLevel > 0) + if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC) { BeginOrContinueCoordinatedTransaction(); @@ -1118,7 +1117,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation /* if some placements failed, ensure future statements don't access them */ MarkFailedShardPlacements(); - if (IsTransactionBlock()) + if (IsMultiStatementTransaction()) { XactModificationLevel = XACT_MODIFICATION_DATA; } @@ -1287,7 +1286,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) { /* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */ } - else if (IsTransactionBlock() || multipleTasks) + else if (IsMultiStatementTransaction() || multipleTasks) { BeginOrContinueCoordinatedTransaction(); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 14bfa7140..66126b7dc 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -572,6 +572,21 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.function_opens_transaction_block", + gettext_noop("Open transaction blocks for function calls"), + gettext_noop("When enabled, Citus will always send a BEGIN to workers when " + "running distributed queres in a function. When disabled, the " + "queries may be committed immediately after the statemnent " + "completes. Disabling this flag is dangerous, it is only provided " + "for backwards compatibility with pre-8.2 behaviour."), + &FunctionOpensTransactionBlock, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + + DefineCustomBoolVariable( "citus.enable_deadlock_prevention", gettext_noop("Avoids deadlocks by preventing concurrent multi-shard commands"), diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index 2f41eaa57..bd36ecff0 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -625,17 +625,17 @@ GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType) * ShouldRecordRelationAccess returns true when we should keep track * of the relation accesses. * - * In many cases, we'd only need IsTransactionBlock(), however, for some cases such as - * CTEs, where Citus uses the same connections accross multiple queries, we should - * still record the relation accesses even not inside an explicit transaction block. - * Thus, keeping track of the relation accesses inside coordinated transactions is - * also required. + * In many cases, we'd only need IsMultiStatementTransaction(), however, for some + * cases such as CTEs, where Citus uses the same connections accross multiple queries, + * we should still record the relation accesses even not inside an explicit transaction + * block. Thus, keeping track of the relation accesses inside coordinated transactions + * is also required. */ bool ShouldRecordRelationAccess() { if (EnforceForeignKeyRestrictions && - (IsTransactionBlock() || InCoordinatedTransaction())) + (IsMultiStatementTransaction() || InCoordinatedTransaction())) { return true; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 70ddfc31e..66bcbf0bf 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -59,6 +59,13 @@ MemoryContext CommitContext = NULL; */ bool CoordinatedTransactionUses2PC = false; +/* if disabled, distributed statements in a function may run as separate transactions */ +bool FunctionOpensTransactionBlock = true; + +/* stack depth of UDF calls */ +int FunctionCallLevel = 0; + + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, @@ -258,6 +265,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + FunctionCallLevel = 0; /* * We should reset SubPlanLevel in case a transaction is aborted, @@ -545,3 +553,34 @@ SwallowErrors(void (*func)()) } PG_END_TRY(); } + + +/* + * IsMultiStatementTransaction determines whether the current statement is + * part of a bigger multi-statement transaction. This is the case when the + * statement is wrapped in a transaction block (comes after BEGIN), or it + * is called from a stored procedure or function. + */ +bool +IsMultiStatementTransaction(void) +{ + if (IsTransactionBlock()) + { + /* in a BEGIN...END block */ + return true; + } + else if (StoredProcedureLevel > 0) + { + /* in (a transaction within) a stored procedure */ + return true; + } + else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock) + { + /* in a language-handler function call, open a transaction if configured to do so */ + return true; + } + else + { + return false; + } +} diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 003d205f3..3149f8fe2 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -36,9 +36,6 @@ typedef struct XactShardConnSet extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; -/* number of nested stored procedure call levels we are currently in */ -extern int StoredProcedureLevel; - extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index a7926efd9..f5da6f23c 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -59,6 +59,12 @@ typedef enum */ extern bool SelectOpensTransactionBlock; +/* + * GUC that determines whether a function should be considered a transaction + * block. + */ +extern bool FunctionOpensTransactionBlock; + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -73,6 +79,13 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState; /* list of connections that are part of the current coordinated transaction */ extern dlist_head InProgressTransactions; +/* number of nested stored procedure call levels we are currently in */ +extern int StoredProcedureLevel; + +/* number of nested function call levels we are currently in */ +extern int FunctionCallLevel; + + /* * Coordinated transaction management. */ @@ -80,6 +93,7 @@ extern void BeginCoordinatedTransaction(void); extern void BeginOrContinueCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); extern void CoordinatedTransactionUse2PC(void); +extern bool IsMultiStatementTransaction(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); diff --git a/src/test/regress/expected/multi_create_insert_proxy.out b/src/test/regress/expected/multi_create_insert_proxy.out deleted file mode 100644 index 3e2f4d00b..000000000 --- a/src/test/regress/expected/multi_create_insert_proxy.out +++ /dev/null @@ -1,85 +0,0 @@ -SET citus.next_shard_id TO 380000; --- =================================================================== --- test INSERT proxy creation functionality --- =================================================================== --- use transaction to permit multiple calls to proxy function in one session -BEGIN; --- use "unorthodox" object names to test quoting -CREATE SCHEMA "A$AP Mob" - CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" ( - id bigint PRIMARY KEY, - data text NOT NULL DEFAULT 'lorem ipsum' - ); -\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"' --- create proxy and save proxy table name -SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename -\gset --- insert to proxy, relying on default value -INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); --- copy some rows into the proxy -COPY pg_temp.:"proxy_tablename" FROM stdin; --- verify rows were copied to target -SELECT * FROM :insert_target ORDER BY id ASC; - id | data -----+----------------------------- - 1 | lorem ipsum - 2 | dolor sit amet - 3 | consectetur adipiscing elit - 4 | sed do eiusmod - 5 | tempor incididunt ut - 6 | labore et dolore -(6 rows) - --- and not to proxy -SELECT count(*) FROM pg_temp.:"proxy_tablename"; - count -------- - 0 -(1 row) - -ROLLBACK; --- test behavior with distributed table, (so no transaction) -CREATE TABLE insert_target ( - id bigint PRIMARY KEY, - data text NOT NULL DEFAULT 'lorem ipsum' -); --- squelch WARNINGs that contain worker_port -SET client_min_messages TO ERROR; -SET citus.shard_count TO 2; -SET citus.shard_replication_factor TO 1; -SELECT create_distributed_table('insert_target', 'id', 'hash'); - create_distributed_table --------------------------- - -(1 row) - -CREATE TEMPORARY SEQUENCE rows_inserted; -SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename -\gset --- insert to proxy, again relying on default value -INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); --- test copy with bad row in middle -\set VERBOSITY terse -COPY pg_temp.:"proxy_tablename" FROM stdin; -ERROR: null value in column "data" violates not-null constraint -\set VERBOSITY default --- verify rows were copied to distributed table -SELECT * FROM insert_target ORDER BY id ASC; - id | data -----+----------------------------- - 1 | lorem ipsum - 2 | dolor sit amet - 3 | consectetur adipiscing elit - 4 | sed do eiusmod - 5 | tempor incididunt ut - 6 | labore et dolore -(6 rows) - --- the counter should match the number of rows stored -SELECT currval('rows_inserted'); - currval ---------- - 6 -(1 row) - -SET client_min_messages TO DEFAULT; diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 88c090619..5f2f79088 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -1577,4 +1577,33 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; (1 row) END; +-- make sure functions that throw an error roll back propertly +CREATE FUNCTION insert_abort() +RETURNS bool +AS $BODY$ +BEGIN + INSERT INTO labs VALUES (1001, 'Abort Labs'); + UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001; + RAISE 'do not insert'; +END; +$BODY$ LANGUAGE plpgsql; +SELECT insert_abort(); +ERROR: do not insert +SELECT name FROM labs WHERE id = 1001; + name +------ +(0 rows) + +-- if function_opens_transaction-block is disabled the insert commits immediately +SET citus.function_opens_transaction_block TO off; +SELECT insert_abort(); +ERROR: do not insert +SELECT name FROM labs WHERE id = 1001; + name +--------------- + Rollback Labs +(1 row) + +RESET citus.function_opens_transaction_block; +DROP FUNCTION insert_abort(); DROP TABLE items, users, itemgroups, usergroups, researchers, labs; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ddf2ccc92..924a03a01 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -174,7 +174,7 @@ test: multi_complex_count_distinct multi_select_distinct test: multi_modifications test: multi_distribution_metadata test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards -test: multi_upsert multi_simple_queries multi_create_insert_proxy multi_data_types +test: multi_upsert multi_simple_queries multi_data_types test: multi_utilities foreign_key_to_reference_table validate_constraint test: multi_modifying_xacts test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index a0ce03770..09cec3b6f 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -86,7 +86,6 @@ test: multi_repair_shards test: multi_modifications test: multi_upsert test: multi_simple_queries -test: multi_create_insert_proxy test: multi_data_types test: multi_utilities diff --git a/src/test/regress/sql/multi_create_insert_proxy.sql b/src/test/regress/sql/multi_create_insert_proxy.sql deleted file mode 100644 index ac8ac1ccf..000000000 --- a/src/test/regress/sql/multi_create_insert_proxy.sql +++ /dev/null @@ -1,84 +0,0 @@ - -SET citus.next_shard_id TO 380000; - - --- =================================================================== --- test INSERT proxy creation functionality --- =================================================================== - --- use transaction to permit multiple calls to proxy function in one session -BEGIN; - --- use "unorthodox" object names to test quoting -CREATE SCHEMA "A$AP Mob" - CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" ( - id bigint PRIMARY KEY, - data text NOT NULL DEFAULT 'lorem ipsum' - ); - -\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"' - --- create proxy and save proxy table name -SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename -\gset - --- insert to proxy, relying on default value -INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); - --- copy some rows into the proxy -COPY pg_temp.:"proxy_tablename" FROM stdin; -2 dolor sit amet -3 consectetur adipiscing elit -4 sed do eiusmod -5 tempor incididunt ut -6 labore et dolore -\. - --- verify rows were copied to target -SELECT * FROM :insert_target ORDER BY id ASC; - --- and not to proxy -SELECT count(*) FROM pg_temp.:"proxy_tablename"; - -ROLLBACK; - --- test behavior with distributed table, (so no transaction) -CREATE TABLE insert_target ( - id bigint PRIMARY KEY, - data text NOT NULL DEFAULT 'lorem ipsum' -); - --- squelch WARNINGs that contain worker_port -SET client_min_messages TO ERROR; -SET citus.shard_count TO 2; -SET citus.shard_replication_factor TO 1; - -SELECT create_distributed_table('insert_target', 'id', 'hash'); - -CREATE TEMPORARY SEQUENCE rows_inserted; -SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename -\gset - --- insert to proxy, again relying on default value -INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1); - --- test copy with bad row in middle -\set VERBOSITY terse -COPY pg_temp.:"proxy_tablename" FROM stdin; -2 dolor sit amet -3 consectetur adipiscing elit -4 sed do eiusmod -5 tempor incididunt ut -6 labore et dolore -7 \N -8 magna aliqua -\. -\set VERBOSITY default - --- verify rows were copied to distributed table -SELECT * FROM insert_target ORDER BY id ASC; - --- the counter should match the number of rows stored -SELECT currval('rows_inserted'); - -SET client_min_messages TO DEFAULT; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 2302872e3..64e778f9c 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -1157,4 +1157,25 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2; SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; END; +-- make sure functions that throw an error roll back propertly +CREATE FUNCTION insert_abort() +RETURNS bool +AS $BODY$ +BEGIN + INSERT INTO labs VALUES (1001, 'Abort Labs'); + UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001; + RAISE 'do not insert'; +END; +$BODY$ LANGUAGE plpgsql; + +SELECT insert_abort(); +SELECT name FROM labs WHERE id = 1001; + +-- if function_opens_transaction-block is disabled the insert commits immediately +SET citus.function_opens_transaction_block TO off; +SELECT insert_abort(); +SELECT name FROM labs WHERE id = 1001; +RESET citus.function_opens_transaction_block; + +DROP FUNCTION insert_abort(); DROP TABLE items, users, itemgroups, usergroups, researchers, labs;