mirror of https://github.com/citusdata/citus.git
Remove create_insert_proxy_for_table
parent
84b853e1b5
commit
4b9bd54ae0
|
@ -376,87 +376,6 @@ CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
|
|||
COMMENT ON AGGREGATE array_cat_agg(anyarray)
|
||||
IS 'concatenate input arrays into a single array';
|
||||
|
||||
|
||||
/*
|
||||
* Creates a temporary table exactly like the specified target table along with
|
||||
* a trigger to redirect any INSERTed rows from the proxy to the underlying
|
||||
* table. Users may optionally provide a sequence which will be incremented
|
||||
* after each row that has been successfully proxied (useful for counting rows
|
||||
* processed). Returns the name of the proxy table that was created.
|
||||
*/
|
||||
CREATE FUNCTION create_insert_proxy_for_table(target_table regclass,
|
||||
sequence regclass DEFAULT NULL)
|
||||
RETURNS text
|
||||
AS $create_insert_proxy_for_table$
|
||||
DECLARE
|
||||
temp_table_name text;
|
||||
attr_names text[];
|
||||
attr_list text;
|
||||
param_list text;
|
||||
using_list text;
|
||||
insert_command text;
|
||||
-- templates to create dynamic functions, tables, and triggers
|
||||
func_tmpl CONSTANT text := $$CREATE FUNCTION pg_temp.copy_to_insert()
|
||||
RETURNS trigger
|
||||
AS $copy_to_insert$
|
||||
BEGIN
|
||||
EXECUTE %L USING %s;
|
||||
PERFORM nextval(%L);
|
||||
RETURN NULL;
|
||||
END;
|
||||
$copy_to_insert$ LANGUAGE plpgsql;$$;
|
||||
table_tmpl CONSTANT text := $$CREATE TEMPORARY TABLE %I
|
||||
(LIKE %s INCLUDING DEFAULTS)$$;
|
||||
trigger_tmpl CONSTANT text := $$CREATE TRIGGER copy_to_insert
|
||||
BEFORE INSERT ON %s FOR EACH ROW
|
||||
EXECUTE PROCEDURE pg_temp.copy_to_insert()$$;
|
||||
BEGIN
|
||||
-- create name of temporary table using unqualified input table name
|
||||
SELECT format('%s_insert_proxy', relname)
|
||||
INTO STRICT temp_table_name
|
||||
FROM pg_class
|
||||
WHERE oid = target_table;
|
||||
|
||||
-- get list of all attributes in table, we'll need shortly
|
||||
SELECT array_agg(attname)
|
||||
INTO STRICT attr_names
|
||||
FROM pg_attribute
|
||||
WHERE attrelid = target_table AND
|
||||
attnum > 0 AND
|
||||
NOT attisdropped;
|
||||
|
||||
-- build fully specified column list and USING clause from attr. names
|
||||
SELECT string_agg(quote_ident(attr_name), ','),
|
||||
string_agg(format('NEW.%I', attr_name), ',')
|
||||
INTO STRICT attr_list,
|
||||
using_list
|
||||
FROM unnest(attr_names) AS attr_name;
|
||||
|
||||
-- build ($1, $2, $3)-style VALUE list to bind parameters
|
||||
SELECT string_agg('$' || param_num, ',')
|
||||
INTO STRICT param_list
|
||||
FROM generate_series(1, array_length(attr_names, 1)) AS param_num;
|
||||
|
||||
-- use the above lists to generate appropriate INSERT command
|
||||
insert_command = format('INSERT INTO %s (%s) VALUES (%s)', target_table,
|
||||
attr_list, param_list);
|
||||
|
||||
-- use the command to make one-off trigger targeting specified table
|
||||
EXECUTE format(func_tmpl, insert_command, using_list, sequence);
|
||||
|
||||
-- create a temporary table exactly like the target table...
|
||||
EXECUTE format(table_tmpl, temp_table_name, target_table);
|
||||
|
||||
-- ... and install the trigger on that temporary table
|
||||
EXECUTE format(trigger_tmpl, quote_ident(temp_table_name)::regclass);
|
||||
|
||||
RETURN temp_table_name;
|
||||
END;
|
||||
$create_insert_proxy_for_table$ LANGUAGE plpgsql SET search_path = 'pg_catalog';
|
||||
|
||||
COMMENT ON FUNCTION create_insert_proxy_for_table(regclass, regclass)
|
||||
IS 'create a proxy table that redirects INSERTed rows to a target table';
|
||||
|
||||
-- define shard repair function
|
||||
CREATE FUNCTION master_copy_shard_placement(shard_id bigint,
|
||||
source_node_name text,
|
||||
|
|
|
@ -7,7 +7,6 @@ DROP FUNCTION IF EXISTS worker_foreign_file_path(text);
|
|||
DROP FUNCTION IF EXISTS worker_find_block_local_path(bigint,text[]);
|
||||
DROP FUNCTION IF EXISTS worker_fetch_query_results_file(bigint,integer,integer,text,integer);
|
||||
DROP FUNCTION IF EXISTS master_drop_distributed_table_metadata(regclass,text,text);
|
||||
REVOKE ALL ON FUNCTION create_insert_proxy_for_table(regclass,regclass) FROM PUBLIC;
|
||||
|
||||
-- Testing functions
|
||||
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
/* citus--8.2-1--8.2-2.sql */
|
||||
|
||||
DROP FUNCTION IF EXISTS pg_catalog.create_insert_proxy_for_table(regclass,regclass);
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '8.2-1'
|
||||
default_version = '8.2-2'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
SET citus.next_shard_id TO 380000;
|
||||
-- ===================================================================
|
||||
-- test INSERT proxy creation functionality
|
||||
-- ===================================================================
|
||||
-- use transaction to permit multiple calls to proxy function in one session
|
||||
BEGIN;
|
||||
-- use "unorthodox" object names to test quoting
|
||||
CREATE SCHEMA "A$AP Mob"
|
||||
CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" (
|
||||
id bigint PRIMARY KEY,
|
||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
||||
);
|
||||
\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"'
|
||||
-- create proxy and save proxy table name
|
||||
SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename
|
||||
\gset
|
||||
-- insert to proxy, relying on default value
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
-- copy some rows into the proxy
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
-- verify rows were copied to target
|
||||
SELECT * FROM :insert_target ORDER BY id ASC;
|
||||
id | data
|
||||
----+-----------------------------
|
||||
1 | lorem ipsum
|
||||
2 | dolor sit amet
|
||||
3 | consectetur adipiscing elit
|
||||
4 | sed do eiusmod
|
||||
5 | tempor incididunt ut
|
||||
6 | labore et dolore
|
||||
(6 rows)
|
||||
|
||||
-- and not to proxy
|
||||
SELECT count(*) FROM pg_temp.:"proxy_tablename";
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- test behavior with distributed table, (so no transaction)
|
||||
CREATE TABLE insert_target (
|
||||
id bigint PRIMARY KEY,
|
||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
||||
);
|
||||
-- squelch WARNINGs that contain worker_port
|
||||
SET client_min_messages TO ERROR;
|
||||
SET citus.shard_count TO 2;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('insert_target', 'id', 'hash');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TEMPORARY SEQUENCE rows_inserted;
|
||||
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
|
||||
\gset
|
||||
-- insert to proxy, again relying on default value
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
-- test copy with bad row in middle
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
ERROR: null value in column "data" violates not-null constraint
|
||||
\set VERBOSITY default
|
||||
-- verify rows were copied to distributed table
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
id | data
|
||||
----+-----------------------------
|
||||
1 | lorem ipsum
|
||||
2 | dolor sit amet
|
||||
3 | consectetur adipiscing elit
|
||||
4 | sed do eiusmod
|
||||
5 | tempor incididunt ut
|
||||
6 | labore et dolore
|
||||
(6 rows)
|
||||
|
||||
-- the counter should match the number of rows stored
|
||||
SELECT currval('rows_inserted');
|
||||
currval
|
||||
---------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
|
@ -174,7 +174,7 @@ test: multi_complex_count_distinct multi_select_distinct
|
|||
test: multi_modifications
|
||||
test: multi_distribution_metadata
|
||||
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
|
||||
test: multi_upsert multi_simple_queries multi_create_insert_proxy multi_data_types
|
||||
test: multi_upsert multi_simple_queries multi_data_types
|
||||
test: multi_utilities foreign_key_to_reference_table validate_constraint
|
||||
test: multi_modifying_xacts
|
||||
test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions
|
||||
|
|
|
@ -86,7 +86,6 @@ test: multi_repair_shards
|
|||
test: multi_modifications
|
||||
test: multi_upsert
|
||||
test: multi_simple_queries
|
||||
test: multi_create_insert_proxy
|
||||
test: multi_data_types
|
||||
test: multi_utilities
|
||||
|
||||
|
|
|
@ -1,84 +0,0 @@
|
|||
|
||||
SET citus.next_shard_id TO 380000;
|
||||
|
||||
|
||||
-- ===================================================================
|
||||
-- test INSERT proxy creation functionality
|
||||
-- ===================================================================
|
||||
|
||||
-- use transaction to permit multiple calls to proxy function in one session
|
||||
BEGIN;
|
||||
|
||||
-- use "unorthodox" object names to test quoting
|
||||
CREATE SCHEMA "A$AP Mob"
|
||||
CREATE TABLE "Dr. Bronner's ""Magic"" Soaps" (
|
||||
id bigint PRIMARY KEY,
|
||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
||||
);
|
||||
|
||||
\set insert_target '"A$AP Mob"."Dr. Bronner''s ""Magic"" Soaps"'
|
||||
|
||||
-- create proxy and save proxy table name
|
||||
SELECT create_insert_proxy_for_table(:'insert_target') AS proxy_tablename
|
||||
\gset
|
||||
|
||||
-- insert to proxy, relying on default value
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
|
||||
-- copy some rows into the proxy
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
2 dolor sit amet
|
||||
3 consectetur adipiscing elit
|
||||
4 sed do eiusmod
|
||||
5 tempor incididunt ut
|
||||
6 labore et dolore
|
||||
\.
|
||||
|
||||
-- verify rows were copied to target
|
||||
SELECT * FROM :insert_target ORDER BY id ASC;
|
||||
|
||||
-- and not to proxy
|
||||
SELECT count(*) FROM pg_temp.:"proxy_tablename";
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
-- test behavior with distributed table, (so no transaction)
|
||||
CREATE TABLE insert_target (
|
||||
id bigint PRIMARY KEY,
|
||||
data text NOT NULL DEFAULT 'lorem ipsum'
|
||||
);
|
||||
|
||||
-- squelch WARNINGs that contain worker_port
|
||||
SET client_min_messages TO ERROR;
|
||||
SET citus.shard_count TO 2;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
SELECT create_distributed_table('insert_target', 'id', 'hash');
|
||||
|
||||
CREATE TEMPORARY SEQUENCE rows_inserted;
|
||||
SELECT create_insert_proxy_for_table('insert_target', 'rows_inserted') AS proxy_tablename
|
||||
\gset
|
||||
|
||||
-- insert to proxy, again relying on default value
|
||||
INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
||||
|
||||
-- test copy with bad row in middle
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
2 dolor sit amet
|
||||
3 consectetur adipiscing elit
|
||||
4 sed do eiusmod
|
||||
5 tempor incididunt ut
|
||||
6 labore et dolore
|
||||
7 \N
|
||||
8 magna aliqua
|
||||
\.
|
||||
\set VERBOSITY default
|
||||
|
||||
-- verify rows were copied to distributed table
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
|
||||
-- the counter should match the number of rows stored
|
||||
SELECT currval('rows_inserted');
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
Loading…
Reference in New Issue