mirror of https://github.com/citusdata/citus.git
Merge pull request #2587 from citusdata/xact_functions
Treat functions as transaction blocks cr: @jasonmp85pull/2628/head
commit
8787cb3199
|
@ -376,87 +376,6 @@ CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
|
||||||
COMMENT ON AGGREGATE array_cat_agg(anyarray)
|
COMMENT ON AGGREGATE array_cat_agg(anyarray)
|
||||||
IS 'concatenate input arrays into a single array';
|
IS 'concatenate input arrays into a single array';
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Creates a temporary table exactly like the specified target table along with
|
|
||||||
* a trigger to redirect any INSERTed rows from the proxy to the underlying
|
|
||||||
* table. Users may optionally provide a sequence which will be incremented
|
|
||||||
* after each row that has been successfully proxied (useful for counting rows
|
|
||||||
* processed). Returns the name of the proxy table that was created.
|
|
||||||
*/
|
|
||||||
CREATE FUNCTION create_insert_proxy_for_table(target_table regclass,
|
|
||||||
sequence regclass DEFAULT NULL)
|
|
||||||
RETURNS text
|
|
||||||
AS $create_insert_proxy_for_table$
|
|
||||||
DECLARE
|
|
||||||
temp_table_name text;
|
|
||||||
attr_names text[];
|
|
||||||
attr_list text;
|
|
||||||
param_list text;
|
|
||||||
using_list text;
|
|
||||||
insert_command text;
|
|
||||||
-- templates to create dynamic functions, tables, and triggers
|
|
||||||
func_tmpl CONSTANT text := $$CREATE FUNCTION pg_temp.copy_to_insert()
|
|
||||||
RETURNS trigger
|
|
||||||
AS $copy_to_insert$
|
|
||||||
BEGIN
|
|
||||||
EXECUTE %L USING %s;
|
|
||||||
PERFORM nextval(%L);
|
|
||||||
RETURN NULL;
|
|
||||||
END;
|
|
||||||
$copy_to_insert$ LANGUAGE plpgsql;$$;
|
|
||||||
table_tmpl CONSTANT text := $$CREATE TEMPORARY TABLE %I
|
|
||||||
(LIKE %s INCLUDING DEFAULTS)$$;
|
|
||||||
trigger_tmpl CONSTANT text := $$CREATE TRIGGER copy_to_insert
|
|
||||||
BEFORE INSERT ON %s FOR EACH ROW
|
|
||||||
EXECUTE PROCEDURE pg_temp.copy_to_insert()$$;
|
|
||||||
BEGIN
|
|
||||||
-- create name of temporary table using unqualified input table name
|
|
||||||
SELECT format('%s_insert_proxy', relname)
|
|
||||||
INTO STRICT temp_table_name
|
|
||||||
FROM pg_class
|
|
||||||
WHERE oid = target_table;
|
|
||||||
|
|
||||||
-- get list of all attributes in table, we'll need shortly
|
|
||||||
SELECT array_agg(attname)
|
|
||||||
INTO STRICT attr_names
|
|
||||||
FROM pg_attribute
|
|
||||||
WHERE attrelid = target_table AND
|
|
||||||
attnum > 0 AND
|
|
||||||
NOT attisdropped;
|
|
||||||
|
|
||||||
-- build fully specified column list and USING clause from attr. names
|
|
||||||
SELECT string_agg(quote_ident(attr_name), ','),
|
|
||||||
string_agg(format('NEW.%I', attr_name), ',')
|
|
||||||
INTO STRICT attr_list,
|
|
||||||
using_list
|
|
||||||
FROM unnest(attr_names) AS attr_name;
|
|
||||||
|
|
||||||
-- build ($1, $2, $3)-style VALUE list to bind parameters
|
|
||||||
SELECT string_agg('$' || param_num, ',')
|
|
||||||
INTO STRICT param_list
|
|
||||||
FROM generate_series(1, array_length(attr_names, 1)) AS param_num;
|
|
||||||
|
|
||||||
-- use the above lists to generate appropriate INSERT command
|
|
||||||
insert_command = format('INSERT INTO %s (%s) VALUES (%s)', target_table,
|
|
||||||
attr_list, param_list);
|
|
||||||
|
|
||||||
-- use the command to make one-off trigger targeting specified table
|
|
||||||
EXECUTE format(func_tmpl, insert_command, using_list, sequence);
|
|
||||||
|
|
||||||
-- create a temporary table exactly like the target table...
|
|
||||||
EXECUTE format(table_tmpl, temp_table_name, target_table);
|
|
||||||
|
|
||||||
-- ... and install the trigger on that temporary table
|
|
||||||
EXECUTE format(trigger_tmpl, quote_ident(temp_table_name)::regclass);
|
|
||||||
|
|
||||||
RETURN temp_table_name;
|
|
||||||
END;
|
|
||||||
$create_insert_proxy_for_table$ LANGUAGE plpgsql SET search_path = 'pg_catalog';
|
|
||||||
|
|
||||||
COMMENT ON FUNCTION create_insert_proxy_for_table(regclass, regclass)
|
|
||||||
IS 'create a proxy table that redirects INSERTed rows to a target table';
|
|
||||||
|
|
||||||
-- define shard repair function
|
-- define shard repair function
|
||||||
CREATE FUNCTION master_copy_shard_placement(shard_id bigint,
|
CREATE FUNCTION master_copy_shard_placement(shard_id bigint,
|
||||||
source_node_name text,
|
source_node_name text,
|
||||||
|
|
|
@ -7,7 +7,6 @@ DROP FUNCTION IF EXISTS worker_foreign_file_path(text);
|
||||||
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
|
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
|
||||||
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
|
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
|
||||||
DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text);
|
DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text);
|
||||||
REVOKE ALL ON FUNCTION create_insert_proxy_for_table(regclass,regclass) FROM PUBLIC;
|
|
||||||
|
|
||||||
-- Testing functions
|
-- Testing functions
|
||||||
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
|
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
/* citus--8.2-1--8.2-2.sql */
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS pg_catalog.create_insert_proxy_for_table(regclass,regclass);
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '8.2-1'
|
default_version = '8.2-2'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -1132,7 +1132,7 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else if (!localTableEmpty || IsTransactionBlock())
|
else if (!localTableEmpty || IsMultiStatementTransaction())
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1159,7 +1159,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
if (ColumnAppearsInForeignKeyToReferenceTable(affectedColumnName,
|
if (ColumnAppearsInForeignKeyToReferenceTable(affectedColumnName,
|
||||||
relationId))
|
relationId))
|
||||||
{
|
{
|
||||||
if (IsTransactionBlock() && alterTableType == AT_AlterColumnType)
|
if (alterTableType == AT_AlterColumnType)
|
||||||
{
|
{
|
||||||
SetLocalMultiShardModifyModeToSequential();
|
SetLocalMultiShardModifyModeToSequential();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "parser/parsetree.h"
|
#include "parser/parsetree.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
#include "tcop/dest.h"
|
||||||
#include "tcop/pquery.h"
|
#include "tcop/pquery.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
#include "utils/snapmgr.h"
|
#include "utils/snapmgr.h"
|
||||||
|
@ -105,6 +106,19 @@ void
|
||||||
CitusExecutorRun(QueryDesc *queryDesc,
|
CitusExecutorRun(QueryDesc *queryDesc,
|
||||||
ScanDirection direction, uint64 count, bool execute_once)
|
ScanDirection direction, uint64 count, bool execute_once)
|
||||||
{
|
{
|
||||||
|
DestReceiver *dest = queryDesc->dest;
|
||||||
|
int originalLevel = FunctionCallLevel;
|
||||||
|
|
||||||
|
if (dest->mydest == DestSPI)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If the query runs via SPI, we assume we're in a function call
|
||||||
|
* and we should treat statements as part of a bigger transaction.
|
||||||
|
* We reset this counter to 0 in the abort handler.
|
||||||
|
*/
|
||||||
|
FunctionCallLevel++;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Disable execution of ALTER TABLE constraint validation queries. These
|
* Disable execution of ALTER TABLE constraint validation queries. These
|
||||||
* constraints will be validated in worker nodes, so running these queries
|
* constraints will be validated in worker nodes, so running these queries
|
||||||
|
@ -122,7 +136,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
|
||||||
if (AlterTableConstraintCheck(queryDesc))
|
if (AlterTableConstraintCheck(queryDesc))
|
||||||
{
|
{
|
||||||
EState *estate = queryDesc->estate;
|
EState *estate = queryDesc->estate;
|
||||||
DestReceiver *dest = queryDesc->dest;
|
|
||||||
|
|
||||||
estate->es_processed = 0;
|
estate->es_processed = 0;
|
||||||
estate->es_lastoid = InvalidOid;
|
estate->es_lastoid = InvalidOid;
|
||||||
|
@ -135,6 +148,16 @@ CitusExecutorRun(QueryDesc *queryDesc,
|
||||||
{
|
{
|
||||||
standard_ExecutorRun(queryDesc, direction, count, execute_once);
|
standard_ExecutorRun(queryDesc, direction, count, execute_once);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dest->mydest == DestSPI)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Restore the original value. It is not sufficient to decrease
|
||||||
|
* the value because exceptions might cause us to go back a few
|
||||||
|
* levels at once.
|
||||||
|
*/
|
||||||
|
FunctionCallLevel = originalLevel;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,7 @@ MultiRealTimeExecute(Job *job)
|
||||||
workerNodeList = ActiveReadableNodeList();
|
workerNodeList = ActiveReadableNodeList();
|
||||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||||
|
|
||||||
if (IsTransactionBlock() && SelectOpensTransactionBlock)
|
if (IsMultiStatementTransaction() && SelectOpensTransactionBlock)
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
}
|
}
|
||||||
|
|
|
@ -631,8 +631,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
||||||
* customers already use functions that touch multiple shards from within
|
* customers already use functions that touch multiple shards from within
|
||||||
* a function, so we'll ignore functions for now.
|
* a function, so we'll ignore functions for now.
|
||||||
*/
|
*/
|
||||||
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC ||
|
if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC)
|
||||||
StoredProcedureLevel > 0)
|
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
@ -1118,7 +1117,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
||||||
/* if some placements failed, ensure future statements don't access them */
|
/* if some placements failed, ensure future statements don't access them */
|
||||||
MarkFailedShardPlacements();
|
MarkFailedShardPlacements();
|
||||||
|
|
||||||
if (IsTransactionBlock())
|
if (IsMultiStatementTransaction())
|
||||||
{
|
{
|
||||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||||
}
|
}
|
||||||
|
@ -1287,7 +1286,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
|
||||||
{
|
{
|
||||||
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
|
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
|
||||||
}
|
}
|
||||||
else if (IsTransactionBlock() || multipleTasks)
|
else if (IsMultiStatementTransaction() || multipleTasks)
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
|
|
@ -572,6 +572,21 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL,
|
GUC_NO_SHOW_ALL,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.function_opens_transaction_block",
|
||||||
|
gettext_noop("Open transaction blocks for function calls"),
|
||||||
|
gettext_noop("When enabled, Citus will always send a BEGIN to workers when "
|
||||||
|
"running distributed queres in a function. When disabled, the "
|
||||||
|
"queries may be committed immediately after the statemnent "
|
||||||
|
"completes. Disabling this flag is dangerous, it is only provided "
|
||||||
|
"for backwards compatibility with pre-8.2 behaviour."),
|
||||||
|
&FunctionOpensTransactionBlock,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_deadlock_prevention",
|
"citus.enable_deadlock_prevention",
|
||||||
gettext_noop("Avoids deadlocks by preventing concurrent multi-shard commands"),
|
gettext_noop("Avoids deadlocks by preventing concurrent multi-shard commands"),
|
||||||
|
|
|
@ -625,17 +625,17 @@ GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType)
|
||||||
* ShouldRecordRelationAccess returns true when we should keep track
|
* ShouldRecordRelationAccess returns true when we should keep track
|
||||||
* of the relation accesses.
|
* of the relation accesses.
|
||||||
*
|
*
|
||||||
* In many cases, we'd only need IsTransactionBlock(), however, for some cases such as
|
* In many cases, we'd only need IsMultiStatementTransaction(), however, for some
|
||||||
* CTEs, where Citus uses the same connections accross multiple queries, we should
|
* cases such as CTEs, where Citus uses the same connections accross multiple queries,
|
||||||
* still record the relation accesses even not inside an explicit transaction block.
|
* we should still record the relation accesses even not inside an explicit transaction
|
||||||
* Thus, keeping track of the relation accesses inside coordinated transactions is
|
* block. Thus, keeping track of the relation accesses inside coordinated transactions
|
||||||
* also required.
|
* is also required.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ShouldRecordRelationAccess()
|
ShouldRecordRelationAccess()
|
||||||
{
|
{
|
||||||
if (EnforceForeignKeyRestrictions &&
|
if (EnforceForeignKeyRestrictions &&
|
||||||
(IsTransactionBlock() || InCoordinatedTransaction()))
|
(IsMultiStatementTransaction() || InCoordinatedTransaction()))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,13 @@ MemoryContext CommitContext = NULL;
|
||||||
*/
|
*/
|
||||||
bool CoordinatedTransactionUses2PC = false;
|
bool CoordinatedTransactionUses2PC = false;
|
||||||
|
|
||||||
|
/* if disabled, distributed statements in a function may run as separate transactions */
|
||||||
|
bool FunctionOpensTransactionBlock = true;
|
||||||
|
|
||||||
|
/* stack depth of UDF calls */
|
||||||
|
int FunctionCallLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
/* transaction management functions */
|
/* transaction management functions */
|
||||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||||
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||||
|
@ -258,6 +265,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
FunctionCallLevel = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We should reset SubPlanLevel in case a transaction is aborted,
|
* We should reset SubPlanLevel in case a transaction is aborted,
|
||||||
|
@ -545,3 +553,34 @@ SwallowErrors(void (*func)())
|
||||||
}
|
}
|
||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsMultiStatementTransaction determines whether the current statement is
|
||||||
|
* part of a bigger multi-statement transaction. This is the case when the
|
||||||
|
* statement is wrapped in a transaction block (comes after BEGIN), or it
|
||||||
|
* is called from a stored procedure or function.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsMultiStatementTransaction(void)
|
||||||
|
{
|
||||||
|
if (IsTransactionBlock())
|
||||||
|
{
|
||||||
|
/* in a BEGIN...END block */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (StoredProcedureLevel > 0)
|
||||||
|
{
|
||||||
|
/* in (a transaction within) a stored procedure */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock)
|
||||||
|
{
|
||||||
|
/* in a language-handler function call, open a transaction if configured to do so */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -36,9 +36,6 @@ typedef struct XactShardConnSet
|
||||||
extern bool AllModificationsCommutative;
|
extern bool AllModificationsCommutative;
|
||||||
extern bool EnableDeadlockPrevention;
|
extern bool EnableDeadlockPrevention;
|
||||||
|
|
||||||
/* number of nested stored procedure call levels we are currently in */
|
|
||||||
extern int StoredProcedureLevel;
|
|
||||||
|
|
||||||
|
|
||||||
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
|
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
||||||
|
|
|
@ -59,6 +59,12 @@ typedef enum
|
||||||
*/
|
*/
|
||||||
extern bool SelectOpensTransactionBlock;
|
extern bool SelectOpensTransactionBlock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GUC that determines whether a function should be considered a transaction
|
||||||
|
* block.
|
||||||
|
*/
|
||||||
|
extern bool FunctionOpensTransactionBlock;
|
||||||
|
|
||||||
/* config variable managed via guc.c */
|
/* config variable managed via guc.c */
|
||||||
extern int MultiShardCommitProtocol;
|
extern int MultiShardCommitProtocol;
|
||||||
|
|
||||||
|
@ -73,6 +79,13 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
|
||||||
/* list of connections that are part of the current coordinated transaction */
|
/* list of connections that are part of the current coordinated transaction */
|
||||||
extern dlist_head InProgressTransactions;
|
extern dlist_head InProgressTransactions;
|
||||||
|
|
||||||
|
/* number of nested stored procedure call levels we are currently in */
|
||||||
|
extern int StoredProcedureLevel;
|
||||||
|
|
||||||
|
/* number of nested function call levels we are currently in */
|
||||||
|
extern int FunctionCallLevel;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Coordinated transaction management.
|
* Coordinated transaction management.
|
||||||
*/
|
*/
|
||||||
|
@ -80,6 +93,7 @@ extern void BeginCoordinatedTransaction(void);
|
||||||
extern void BeginOrContinueCoordinatedTransaction(void);
|
extern void BeginOrContinueCoordinatedTransaction(void);
|
||||||
extern bool InCoordinatedTransaction(void);
|
extern bool InCoordinatedTransaction(void);
|
||||||
extern void CoordinatedTransactionUse2PC(void);
|
extern void CoordinatedTransactionUse2PC(void);
|
||||||
|
extern bool IsMultiStatementTransaction(void);
|
||||||
|
|
||||||
/* initialization function(s) */
|
/* initialization function(s) */
|
||||||
extern void InitializeTransactionManagement(void);
|
extern void InitializeTransactionManagement(void);
|
||||||
|
|
|
@ -1,85 +0,0 @@
|
||||||
SET citus.next_shard_id TO 380000;
|
|
||||||
-- ===================================================================
|
|
||||||
-- test INSERT proxy creation functionality
|
|
||||||
-- ===================================================================
|
|
||||||
-- use transaction to permit multiple calls to proxy function in one session
|
|
||||||
BEGIN;
|
|
||||||
-- use "unorthodox" object names to test quoting
|
|
||||||
CREATE SCHEMA "A$AP Mob"
|
|
||||||
CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" (
|
|
||||||
id bigint PRIMARY KEY,
|
|
||||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
|
||||||
);
|
|
||||||
\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"'
|
|
||||||
-- create proxy and save proxy table name
|
|
||||||
SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename
|
|
||||||
\gset
|
|
||||||
-- insert to proxy, relying on default value
|
|
||||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|
||||||
-- copy some rows into the proxy
|
|
||||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
|
||||||
-- verify rows were copied to target
|
|
||||||
SELECT * FROM :insert_target ORDER BY id ASC;
|
|
||||||
id | data
|
|
||||||
----+-----------------------------
|
|
||||||
1 | lorem ipsum
|
|
||||||
2 | dolor sit amet
|
|
||||||
3 | consectetur adipiscing elit
|
|
||||||
4 | sed do eiusmod
|
|
||||||
5 | tempor incididunt ut
|
|
||||||
6 | labore et dolore
|
|
||||||
(6 rows)
|
|
||||||
|
|
||||||
-- and not to proxy
|
|
||||||
SELECT count(*) FROM pg_temp.:"proxy_tablename";
|
|
||||||
count
|
|
||||||
-------
|
|
||||||
0
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
ROLLBACK;
|
|
||||||
-- test behavior with distributed table, (so no transaction)
|
|
||||||
CREATE TABLE insert_target (
|
|
||||||
id bigint PRIMARY KEY,
|
|
||||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
|
||||||
);
|
|
||||||
-- squelch WARNINGs that contain worker_port
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.shard_count TO 2;
|
|
||||||
SET citus.shard_replication_factor TO 1;
|
|
||||||
SELECT create_distributed_table('insert_target', 'id', 'hash');
|
|
||||||
create_distributed_table
|
|
||||||
--------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
CREATE TEMPORARY SEQUENCE rows_inserted;
|
|
||||||
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
|
|
||||||
\gset
|
|
||||||
-- insert to proxy, again relying on default value
|
|
||||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|
||||||
-- test copy with bad row in middle
|
|
||||||
\set VERBOSITY terse
|
|
||||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
|
||||||
ERROR: null value in column "data" violates not-null constraint
|
|
||||||
\set VERBOSITY default
|
|
||||||
-- verify rows were copied to distributed table
|
|
||||||
SELECT * FROM insert_target ORDER BY id ASC;
|
|
||||||
id | data
|
|
||||||
----+-----------------------------
|
|
||||||
1 | lorem ipsum
|
|
||||||
2 | dolor sit amet
|
|
||||||
3 | consectetur adipiscing elit
|
|
||||||
4 | sed do eiusmod
|
|
||||||
5 | tempor incididunt ut
|
|
||||||
6 | labore et dolore
|
|
||||||
(6 rows)
|
|
||||||
|
|
||||||
-- the counter should match the number of rows stored
|
|
||||||
SELECT currval('rows_inserted');
|
|
||||||
currval
|
|
||||||
---------
|
|
||||||
6
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
|
|
@ -1577,4 +1577,33 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
END;
|
END;
|
||||||
|
-- make sure functions that throw an error roll back propertly
|
||||||
|
CREATE FUNCTION insert_abort()
|
||||||
|
RETURNS bool
|
||||||
|
AS $BODY$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO labs VALUES (1001, 'Abort Labs');
|
||||||
|
UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001;
|
||||||
|
RAISE 'do not insert';
|
||||||
|
END;
|
||||||
|
$BODY$ LANGUAGE plpgsql;
|
||||||
|
SELECT insert_abort();
|
||||||
|
ERROR: do not insert
|
||||||
|
SELECT name FROM labs WHERE id = 1001;
|
||||||
|
name
|
||||||
|
------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- if function_opens_transaction-block is disabled the insert commits immediately
|
||||||
|
SET citus.function_opens_transaction_block TO off;
|
||||||
|
SELECT insert_abort();
|
||||||
|
ERROR: do not insert
|
||||||
|
SELECT name FROM labs WHERE id = 1001;
|
||||||
|
name
|
||||||
|
---------------
|
||||||
|
Rollback Labs
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.function_opens_transaction_block;
|
||||||
|
DROP FUNCTION insert_abort();
|
||||||
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
|
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
|
||||||
|
|
|
@ -174,7 +174,7 @@ test: multi_complex_count_distinct multi_select_distinct
|
||||||
test: multi_modifications
|
test: multi_modifications
|
||||||
test: multi_distribution_metadata
|
test: multi_distribution_metadata
|
||||||
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
|
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
|
||||||
test: multi_upsert multi_simple_queries multi_create_insert_proxy multi_data_types
|
test: multi_upsert multi_simple_queries multi_data_types
|
||||||
test: multi_utilities foreign_key_to_reference_table validate_constraint
|
test: multi_utilities foreign_key_to_reference_table validate_constraint
|
||||||
test: multi_modifying_xacts
|
test: multi_modifying_xacts
|
||||||
test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions
|
test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions
|
||||||
|
|
|
@ -86,7 +86,6 @@ test: multi_repair_shards
|
||||||
test: multi_modifications
|
test: multi_modifications
|
||||||
test: multi_upsert
|
test: multi_upsert
|
||||||
test: multi_simple_queries
|
test: multi_simple_queries
|
||||||
test: multi_create_insert_proxy
|
|
||||||
test: multi_data_types
|
test: multi_data_types
|
||||||
test: multi_utilities
|
test: multi_utilities
|
||||||
|
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
|
|
||||||
SET citus.next_shard_id TO 380000;
|
|
||||||
|
|
||||||
|
|
||||||
-- ===================================================================
|
|
||||||
-- test INSERT proxy creation functionality
|
|
||||||
-- ===================================================================
|
|
||||||
|
|
||||||
-- use transaction to permit multiple calls to proxy function in one session
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
-- use "unorthodox" object names to test quoting
|
|
||||||
CREATE SCHEMA "A$AP Mob"
|
|
||||||
CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" (
|
|
||||||
id bigint PRIMARY KEY,
|
|
||||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
|
||||||
);
|
|
||||||
|
|
||||||
\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"'
|
|
||||||
|
|
||||||
-- create proxy and save proxy table name
|
|
||||||
SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename
|
|
||||||
\gset
|
|
||||||
|
|
||||||
-- insert to proxy, relying on default value
|
|
||||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|
||||||
|
|
||||||
-- copy some rows into the proxy
|
|
||||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
|
||||||
2 dolor sit amet
|
|
||||||
3 consectetur adipiscing elit
|
|
||||||
4 sed do eiusmod
|
|
||||||
5 tempor incididunt ut
|
|
||||||
6 labore et dolore
|
|
||||||
\.
|
|
||||||
|
|
||||||
-- verify rows were copied to target
|
|
||||||
SELECT * FROM :insert_target ORDER BY id ASC;
|
|
||||||
|
|
||||||
-- and not to proxy
|
|
||||||
SELECT count(*) FROM pg_temp.:"proxy_tablename";
|
|
||||||
|
|
||||||
ROLLBACK;
|
|
||||||
|
|
||||||
-- test behavior with distributed table, (so no transaction)
|
|
||||||
CREATE TABLE insert_target (
|
|
||||||
id bigint PRIMARY KEY,
|
|
||||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
|
||||||
);
|
|
||||||
|
|
||||||
-- squelch WARNINGs that contain worker_port
|
|
||||||
SET client_min_messages TO ERROR;
|
|
||||||
SET citus.shard_count TO 2;
|
|
||||||
SET citus.shard_replication_factor TO 1;
|
|
||||||
|
|
||||||
SELECT create_distributed_table('insert_target', 'id', 'hash');
|
|
||||||
|
|
||||||
CREATE TEMPORARY SEQUENCE rows_inserted;
|
|
||||||
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
|
|
||||||
\gset
|
|
||||||
|
|
||||||
-- insert to proxy, again relying on default value
|
|
||||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|
||||||
|
|
||||||
-- test copy with bad row in middle
|
|
||||||
\set VERBOSITY terse
|
|
||||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
|
||||||
2 dolor sit amet
|
|
||||||
3 consectetur adipiscing elit
|
|
||||||
4 sed do eiusmod
|
|
||||||
5 tempor incididunt ut
|
|
||||||
6 labore et dolore
|
|
||||||
7 \N
|
|
||||||
8 magna aliqua
|
|
||||||
\.
|
|
||||||
\set VERBOSITY default
|
|
||||||
|
|
||||||
-- verify rows were copied to distributed table
|
|
||||||
SELECT * FROM insert_target ORDER BY id ASC;
|
|
||||||
|
|
||||||
-- the counter should match the number of rows stored
|
|
||||||
SELECT currval('rows_inserted');
|
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
|
|
@ -1157,4 +1157,25 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
|
||||||
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
-- make sure functions that throw an error roll back propertly
|
||||||
|
CREATE FUNCTION insert_abort()
|
||||||
|
RETURNS bool
|
||||||
|
AS $BODY$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO labs VALUES (1001, 'Abort Labs');
|
||||||
|
UPDATE labs SET name = 'Rollback Labs' WHERE id = 1001;
|
||||||
|
RAISE 'do not insert';
|
||||||
|
END;
|
||||||
|
$BODY$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
SELECT insert_abort();
|
||||||
|
SELECT name FROM labs WHERE id = 1001;
|
||||||
|
|
||||||
|
-- if function_opens_transaction-block is disabled the insert commits immediately
|
||||||
|
SET citus.function_opens_transaction_block TO off;
|
||||||
|
SELECT insert_abort();
|
||||||
|
SELECT name FROM labs WHERE id = 1001;
|
||||||
|
RESET citus.function_opens_transaction_block;
|
||||||
|
|
||||||
|
DROP FUNCTION insert_abort();
|
||||||
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
|
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
|
||||||
|
|
Loading…
Reference in New Issue